1package zmq4 2 3/* 4#cgo !windows pkg-config: libzmq 5#cgo windows CFLAGS: -I/usr/local/include 6#cgo windows LDFLAGS: -L/usr/local/lib -lzmq 7#include <zmq.h> 8#if ZMQ_VERSION_MINOR < 2 9#include <zmq_utils.h> 10#endif 11#include <stdlib.h> 12#include <string.h> 13#include "zmq4.h" 14 15int 16 zmq4_major = ZMQ_VERSION_MAJOR, 17 zmq4_minor = ZMQ_VERSION_MINOR, 18 zmq4_patch = ZMQ_VERSION_PATCH; 19 20#if ZMQ_VERSION_MINOR > 0 21// Version >= 4.1.x 22 23typedef struct { 24 uint16_t event; // id of the event as bitfield 25 int32_t value; // value is either error code, fd or reconnect interval 26} zmq_event_t; 27 28#else 29// Version == 4.0.x 30 31const char *zmq_msg_gets (zmq_msg_t *msg, const char *property) { 32 return NULL; 33} 34 35int zmq_has (const char *capability) { 36 return 0; 37} 38 39#if ZMQ_VERSION_PATCH < 5 40// Version < 4.0.5 41 42int zmq_proxy_steerable (const void *frontend, const void *backend, const void *capture, const void *control) { 43 return -1; 44} 45 46#endif // Version < 4.0.5 47 48#endif // Version == 4.0.x 49 50void zmq4_get_event40(zmq_msg_t *msg, int *ev, int *val) { 51 zmq_event_t event; 52 const char* data = (char*)zmq_msg_data(msg); 53 memcpy(&(event.event), data, sizeof(event.event)); 54 memcpy(&(event.value), data+sizeof(event.event), sizeof(event.value)); 55 *ev = (int)(event.event); 56 *val = (int)(event.value); 57} 58void zmq4_get_event41(zmq_msg_t *msg, int *ev, int *val) { 59 uint8_t *data = (uint8_t *) zmq_msg_data (msg); 60 uint16_t event = *(uint16_t *) (data); 61 *ev = (int)event; 62 *val = (int)(*(uint32_t *) (data + 2)); 63} 64void *zmq4_memcpy(void *dest, const void *src, size_t n) { 65 return memcpy(dest, src, n); 66} 67*/ 68import "C" 69 70import ( 71 "errors" 72 "fmt" 73 "runtime" 74 "strings" 75 "unsafe" 76) 77 78var ( 79 defaultCtx *Context 80 81 major, minor, patch int 82 83 ErrorContextClosed = errors.New("Context is closed") 84 ErrorSocketClosed = errors.New("Socket is closed") 85 ErrorMoreExpected = errors.New("More expected") 86 ErrorNotImplemented405 = errors.New("Not implemented, requires 0MQ version 4.0.5") 87 ErrorNotImplemented41 = errors.New("Not implemented, requires 0MQ version 4.1") 88 ErrorNotImplemented42 = errors.New("Not implemented, requires 0MQ version 4.2") 89 ErrorNotImplementedWindows = errors.New("Not implemented on Windows") 90 ErrorNoSocket = errors.New("No such socket") 91 92 initVersionError error 93 initContextError error 94 95 // api compatibility, based on changes in header files 96 api = map[[2]int]int{ 97 [2]int{0, 0}: 1, 98 [2]int{0, 1}: 2, 99 [2]int{0, 2}: 3, 100 [2]int{0, 3}: 3, 101 [2]int{0, 4}: 3, 102 [2]int{0, 5}: 4, 103 [2]int{0, 6}: 4, 104 [2]int{0, 7}: 4, 105 [2]int{0, 8}: 4, 106 [2]int{0, 9}: 4, 107 [2]int{0, 10}: 4, 108 [2]int{1, 0}: 5, 109 [2]int{1, 1}: 6, 110 [2]int{1, 2}: 6, 111 [2]int{1, 3}: 6, 112 [2]int{1, 4}: 6, 113 [2]int{1, 5}: 6, 114 [2]int{1, 6}: 7, 115 [2]int{1, 7}: 7, 116 [2]int{1, 8}: 7, 117 [2]int{2, 0}: 8, 118 [2]int{2, 1}: 9, 119 [2]int{2, 2}: 9, 120 [2]int{2, 3}: 9, 121 [2]int{2, 4}: 9, 122 [2]int{2, 5}: 9, 123 [2]int{3, 0}: 10, 124 [2]int{3, 1}: 10, 125 [2]int{3, 2}: 10, 126 [2]int{3, 3}: 10, 127 [2]int{3, 4}: 10, 128 } 129) 130 131func init() { 132 major, minor, patch = Version() 133 if major != 4 { 134 initVersionError = fmt.Errorf("Using zmq4 with ZeroMQ major version %d", major) 135 return 136 } 137 138 v, ok1 := api[[2]int{minor, patch}] 139 w, ok2 := api[[2]int{int(C.zmq4_minor), int(C.zmq4_patch)}] 140 if v != w || !ok1 || !ok2 { 141 if major != int(C.zmq4_major) || minor != int(C.zmq4_minor) || patch != int(C.zmq4_patch) { 142 initVersionError = 143 fmt.Errorf( 144 "zmq4 was compiled with ZeroMQ version %d.%d.%d, but the runtime links with version %d.%d.%d", 145 int(C.zmq4_major), int(C.zmq4_minor), int(C.zmq4_patch), 146 major, minor, patch) 147 return 148 } 149 } 150 151 var err error 152 defaultCtx = &Context{} 153 defaultCtx.ctx, err = C.zmq4_ctx_new() 154 if defaultCtx.ctx == nil { 155 initContextError = fmt.Errorf("Init of ZeroMQ context failed: %v", errget(err)) 156 return 157 } 158 defaultCtx.opened = true 159 defaultCtx.retryEINTR = true 160} 161 162//. Util 163 164// Report 0MQ library version. 165func Version() (major, minor, patch int) { 166 if initVersionError != nil { 167 return 0, 0, 0 168 } 169 var maj, min, pat C.int 170 C.zmq_version(&maj, &min, &pat) 171 return int(maj), int(min), int(pat) 172} 173 174// Get 0MQ error message string. 175func Error(e int) string { 176 return C.GoString(C.zmq_strerror(C.int(e))) 177} 178 179//. Context 180 181const ( 182 MaxSocketsDflt = int(C.ZMQ_MAX_SOCKETS_DFLT) 183 IoThreadsDflt = int(C.ZMQ_IO_THREADS_DFLT) 184) 185 186/* 187A context that is not the default context. 188*/ 189type Context struct { 190 ctx unsafe.Pointer 191 retryEINTR bool 192 opened bool 193 err error 194} 195 196// Create a new context. 197func NewContext() (ctx *Context, err error) { 198 if initVersionError != nil { 199 return nil, initVersionError 200 } 201 ctx = &Context{} 202 c, e := C.zmq4_ctx_new() 203 if c == nil { 204 err = errget(e) 205 ctx.err = err 206 } else { 207 ctx.ctx = c 208 ctx.retryEINTR = true 209 ctx.opened = true 210 runtime.SetFinalizer(ctx, (*Context).Term) 211 } 212 return 213} 214 215/* 216Terminates the default context. 217 218For linger behavior, see: http://api.zeromq.org/4-1:zmq-ctx-term 219*/ 220func Term() error { 221 if initVersionError != nil { 222 return initVersionError 223 } 224 if initContextError != nil { 225 return initContextError 226 } 227 return defaultCtx.Term() 228} 229 230/* 231Terminates the context. 232 233For linger behavior, see: http://api.zeromq.org/4-1:zmq-ctx-term 234*/ 235func (ctx *Context) Term() error { 236 if ctx.opened { 237 ctx.opened = false 238 var n C.int 239 var err error 240 for { 241 n, err = C.zmq4_ctx_term(ctx.ctx) 242 if n == 0 || !ctx.retry(err) { 243 break 244 } 245 } 246 if n != 0 { 247 ctx.err = errget(err) 248 } 249 } 250 return ctx.err 251} 252 253func getOption(ctx *Context, o C.int) (int, error) { 254 if !ctx.opened { 255 return 0, ErrorContextClosed 256 } 257 nc, err := C.zmq4_ctx_get(ctx.ctx, o) 258 n := int(nc) 259 if n < 0 { 260 return n, errget(err) 261 } 262 return n, nil 263} 264 265// Returns the size of the 0MQ thread pool in the default context. 266func GetIoThreads() (int, error) { 267 if initVersionError != nil { 268 return 0, initVersionError 269 } 270 if initContextError != nil { 271 return 0, initContextError 272 } 273 return defaultCtx.GetIoThreads() 274} 275 276// Returns the size of the 0MQ thread pool. 277func (ctx *Context) GetIoThreads() (int, error) { 278 return getOption(ctx, C.ZMQ_IO_THREADS) 279} 280 281// Returns the maximum number of sockets allowed in the default context. 282func GetMaxSockets() (int, error) { 283 if initVersionError != nil { 284 return 0, initVersionError 285 } 286 if initContextError != nil { 287 return 0, initContextError 288 } 289 return defaultCtx.GetMaxSockets() 290} 291 292// Returns the maximum number of sockets allowed. 293func (ctx *Context) GetMaxSockets() (int, error) { 294 return getOption(ctx, C.ZMQ_MAX_SOCKETS) 295} 296 297/* 298Returns the maximum message size in the default context. 299 300Returns ErrorNotImplemented42 with ZeroMQ version < 4.2 301*/ 302func GetMaxMsgsz() (int, error) { 303 if initVersionError != nil { 304 return 0, initVersionError 305 } 306 if initContextError != nil { 307 return 0, initContextError 308 } 309 return defaultCtx.GetMaxMsgsz() 310} 311 312/* 313Returns the maximum message size. 314 315Returns ErrorNotImplemented42 with ZeroMQ version < 4.2 316*/ 317func (ctx *Context) GetMaxMsgsz() (int, error) { 318 if minor < 2 { 319 return 0, ErrorNotImplemented42 320 } 321 return getOption(ctx, C.ZMQ_MAX_MSGSZ) 322} 323 324// Returns the IPv6 option in the default context. 325func GetIpv6() (bool, error) { 326 if initVersionError != nil { 327 return false, initVersionError 328 } 329 if initContextError != nil { 330 return false, initContextError 331 } 332 return defaultCtx.GetIpv6() 333} 334 335// Returns the IPv6 option. 336func (ctx *Context) GetIpv6() (bool, error) { 337 i, e := getOption(ctx, C.ZMQ_IPV6) 338 if i == 0 { 339 return false, e 340 } 341 return true, e 342} 343 344/* 345Returns the blocky setting in the default context. 346 347Returns ErrorNotImplemented42 with ZeroMQ version < 4.2 348*/ 349func GetBlocky() (bool, error) { 350 if initVersionError != nil { 351 return false, initVersionError 352 } 353 if initContextError != nil { 354 return false, initContextError 355 } 356 return defaultCtx.GetBlocky() 357} 358 359/* 360Returns the blocky setting. 361 362Returns ErrorNotImplemented42 with ZeroMQ version < 4.2 363*/ 364func (ctx *Context) GetBlocky() (bool, error) { 365 if minor < 2 { 366 return false, ErrorNotImplemented42 367 } 368 i, e := getOption(ctx, C.ZMQ_BLOCKY) 369 if i == 0 { 370 return false, e 371 } 372 return true, e 373} 374 375/* 376Returns the retry after EINTR setting in the default context. 377*/ 378func GetRetryAfterEINTR() bool { 379 return defaultCtx.GetRetryAfterEINTR() 380} 381 382/* 383Returns the retry after EINTR setting. 384*/ 385func (ctx *Context) GetRetryAfterEINTR() bool { 386 return ctx.retryEINTR 387} 388 389func setOption(ctx *Context, o C.int, n int) error { 390 if !ctx.opened { 391 return ErrorContextClosed 392 } 393 i, err := C.zmq4_ctx_set(ctx.ctx, o, C.int(n)) 394 if int(i) != 0 { 395 return errget(err) 396 } 397 return nil 398} 399 400/* 401Specifies the size of the 0MQ thread pool to handle I/O operations in 402the default context. If your application is using only the inproc 403transport for messaging you may set this to zero, otherwise set it to at 404least one. This option only applies before creating any sockets. 405 406Default value: 1 407*/ 408func SetIoThreads(n int) error { 409 if initVersionError != nil { 410 return initVersionError 411 } 412 if initContextError != nil { 413 return initContextError 414 } 415 return defaultCtx.SetIoThreads(n) 416} 417 418/* 419Specifies the size of the 0MQ thread pool to handle I/O operations. If 420your application is using only the inproc transport for messaging you 421may set this to zero, otherwise set it to at least one. This option only 422applies before creating any sockets. 423 424Default value: 1 425*/ 426func (ctx *Context) SetIoThreads(n int) error { 427 return setOption(ctx, C.ZMQ_IO_THREADS, n) 428} 429 430/* 431Sets the scheduling policy for default context’s thread pool. 432 433This option requires ZeroMQ version 4.1, and is not available on Windows. 434 435Supported values for this option can be found in sched.h file, or at 436http://man7.org/linux/man-pages/man2/sched_setscheduler.2.html 437 438This option only applies before creating any sockets on the context. 439 440Default value: -1 441 442Returns ErrorNotImplemented41 with ZeroMQ version < 4.1 443 444Returns ErrorNotImplementedWindows on Windows 445*/ 446func SetThreadSchedPolicy(n int) error { 447 if initVersionError != nil { 448 return initVersionError 449 } 450 if initContextError != nil { 451 return initContextError 452 } 453 return defaultCtx.SetThreadSchedPolicy(n) 454} 455 456/* 457Sets scheduling priority for default context’s thread pool. 458 459This option requires ZeroMQ version 4.1, and is not available on Windows. 460 461Supported values for this option depend on chosen scheduling policy. 462Details can be found in sched.h file, or at 463http://man7.org/linux/man-pages/man2/sched_setscheduler.2.html 464 465This option only applies before creating any sockets on the context. 466 467Default value: -1 468 469Returns ErrorNotImplemented41 with ZeroMQ version < 4.1 470 471Returns ErrorNotImplementedWindows on Windows 472*/ 473func SetThreadPriority(n int) error { 474 if initVersionError != nil { 475 return initVersionError 476 } 477 if initContextError != nil { 478 return initContextError 479 } 480 return defaultCtx.SetThreadPriority(n) 481} 482 483/* 484Set maximum message size in the default context. 485 486Default value: INT_MAX 487 488Returns ErrorNotImplemented42 with ZeroMQ version < 4.2 489*/ 490func SetMaxMsgsz(n int) error { 491 if initVersionError != nil { 492 return initVersionError 493 } 494 if initContextError != nil { 495 return initContextError 496 } 497 return defaultCtx.SetMaxMsgsz(n) 498} 499 500/* 501Set maximum message size. 502 503Default value: INT_MAX 504 505Returns ErrorNotImplemented42 with ZeroMQ version < 4.2 506*/ 507func (ctx *Context) SetMaxMsgsz(n int) error { 508 if minor < 2 { 509 return ErrorNotImplemented42 510 } 511 return setOption(ctx, C.ZMQ_MAX_MSGSZ, n) 512} 513 514/* 515Sets the maximum number of sockets allowed in the default context. 516 517Default value: 1024 518*/ 519func SetMaxSockets(n int) error { 520 if initVersionError != nil { 521 return initVersionError 522 } 523 if initContextError != nil { 524 return initContextError 525 } 526 return defaultCtx.SetMaxSockets(n) 527} 528 529/* 530Sets the maximum number of sockets allowed. 531 532Default value: 1024 533*/ 534func (ctx *Context) SetMaxSockets(n int) error { 535 return setOption(ctx, C.ZMQ_MAX_SOCKETS, n) 536} 537 538/* 539Sets the IPv6 value for all sockets created in the default context from this point onwards. 540A value of true means IPv6 is enabled, while false means the socket will use only IPv4. 541When IPv6 is enabled, a socket will connect to, or accept connections from, both IPv4 and IPv6 hosts. 542 543Default value: false 544*/ 545func SetIpv6(i bool) error { 546 if initVersionError != nil { 547 return initVersionError 548 } 549 if initContextError != nil { 550 return initContextError 551 } 552 return defaultCtx.SetIpv6(i) 553} 554 555/* 556Sets the IPv6 value for all sockets created in the context from this point onwards. 557A value of true means IPv6 is enabled, while false means the socket will use only IPv4. 558When IPv6 is enabled, a socket will connect to, or accept connections from, both IPv4 and IPv6 hosts. 559 560Default value: false 561*/ 562func (ctx *Context) SetIpv6(i bool) error { 563 n := 0 564 if i { 565 n = 1 566 } 567 return setOption(ctx, C.ZMQ_IPV6, n) 568} 569 570/* 571Sets the blocky behavior in the default context. 572 573See: http://api.zeromq.org/4-2:zmq-ctx-set#toc3 574 575Returns ErrorNotImplemented42 with ZeroMQ version < 4.2 576*/ 577func SetBlocky(i bool) error { 578 if initVersionError != nil { 579 return initVersionError 580 } 581 if initContextError != nil { 582 return initContextError 583 } 584 return defaultCtx.SetBlocky(i) 585} 586 587/* 588Sets the blocky behavior. 589 590See: http://api.zeromq.org/4-2:zmq-ctx-set#toc3 591 592Returns ErrorNotImplemented42 with ZeroMQ version < 4.2 593*/ 594func (ctx *Context) SetBlocky(i bool) error { 595 if minor < 2 { 596 return ErrorNotImplemented42 597 } 598 n := 0 599 if i { 600 n = 1 601 } 602 return setOption(ctx, C.ZMQ_BLOCKY, n) 603} 604 605/* 606Sets the retry after EINTR setting in the default context. 607 608Initital value is true. 609*/ 610func SetRetryAfterEINTR(retry bool) { 611 defaultCtx.SetRetryAfterEINTR(retry) 612} 613 614/* 615Sets the retry after EINTR setting. 616 617Initital value is true. 618*/ 619func (ctx *Context) SetRetryAfterEINTR(retry bool) { 620 ctx.retryEINTR = retry 621} 622 623//. Sockets 624 625// Specifies the type of a socket, used by NewSocket() 626type Type int 627 628const ( 629 // Constants for NewSocket() 630 // See: http://api.zeromq.org/4-1:zmq-socket#toc3 631 REQ = Type(C.ZMQ_REQ) 632 REP = Type(C.ZMQ_REP) 633 DEALER = Type(C.ZMQ_DEALER) 634 ROUTER = Type(C.ZMQ_ROUTER) 635 PUB = Type(C.ZMQ_PUB) 636 SUB = Type(C.ZMQ_SUB) 637 XPUB = Type(C.ZMQ_XPUB) 638 XSUB = Type(C.ZMQ_XSUB) 639 PUSH = Type(C.ZMQ_PUSH) 640 PULL = Type(C.ZMQ_PULL) 641 PAIR = Type(C.ZMQ_PAIR) 642 STREAM = Type(C.ZMQ_STREAM) 643) 644 645/* 646Socket type as string. 647*/ 648func (t Type) String() string { 649 switch t { 650 case REQ: 651 return "REQ" 652 case REP: 653 return "REP" 654 case DEALER: 655 return "DEALER" 656 case ROUTER: 657 return "ROUTER" 658 case PUB: 659 return "PUB" 660 case SUB: 661 return "SUB" 662 case XPUB: 663 return "XPUB" 664 case XSUB: 665 return "XSUB" 666 case PUSH: 667 return "PUSH" 668 case PULL: 669 return "PULL" 670 case PAIR: 671 return "PAIR" 672 case STREAM: 673 return "STREAM" 674 } 675 return "<INVALID>" 676} 677 678// Used by (*Socket)Send() and (*Socket)Recv() 679type Flag int 680 681const ( 682 // Flags for (*Socket)Send(), (*Socket)Recv() 683 // For Send, see: http://api.zeromq.org/4-1:zmq-send#toc2 684 // For Recv, see: http://api.zeromq.org/4-1:zmq-msg-recv#toc2 685 DONTWAIT = Flag(C.ZMQ_DONTWAIT) 686 SNDMORE = Flag(C.ZMQ_SNDMORE) 687) 688 689/* 690Socket flag as string. 691*/ 692func (f Flag) String() string { 693 ff := make([]string, 0) 694 if f&DONTWAIT != 0 { 695 ff = append(ff, "DONTWAIT") 696 } 697 if f&SNDMORE != 0 { 698 ff = append(ff, "SNDMORE") 699 } 700 if len(ff) == 0 { 701 return "<NONE>" 702 } 703 return strings.Join(ff, "|") 704} 705 706// Used by (*Socket)Monitor() and (*Socket)RecvEvent() 707type Event int 708 709const ( 710 // Flags for (*Socket)Monitor() and (*Socket)RecvEvent() 711 // See: http://api.zeromq.org/4-3:zmq-socket-monitor#toc3 712 EVENT_ALL = Event(C.ZMQ_EVENT_ALL) 713 EVENT_CONNECTED = Event(C.ZMQ_EVENT_CONNECTED) 714 EVENT_CONNECT_DELAYED = Event(C.ZMQ_EVENT_CONNECT_DELAYED) 715 EVENT_CONNECT_RETRIED = Event(C.ZMQ_EVENT_CONNECT_RETRIED) 716 EVENT_LISTENING = Event(C.ZMQ_EVENT_LISTENING) 717 EVENT_BIND_FAILED = Event(C.ZMQ_EVENT_BIND_FAILED) 718 EVENT_ACCEPTED = Event(C.ZMQ_EVENT_ACCEPTED) 719 EVENT_ACCEPT_FAILED = Event(C.ZMQ_EVENT_ACCEPT_FAILED) 720 EVENT_CLOSED = Event(C.ZMQ_EVENT_CLOSED) 721 EVENT_CLOSE_FAILED = Event(C.ZMQ_EVENT_CLOSE_FAILED) 722 EVENT_DISCONNECTED = Event(C.ZMQ_EVENT_DISCONNECTED) 723 EVENT_MONITOR_STOPPED = Event(C.ZMQ_EVENT_MONITOR_STOPPED) 724 EVENT_HANDSHAKE_FAILED_NO_DETAIL = Event(C.ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL) 725 EVENT_HANDSHAKE_SUCCEEDED = Event(C.ZMQ_EVENT_HANDSHAKE_SUCCEEDED) 726 EVENT_HANDSHAKE_FAILED_PROTOCOL = Event(C.ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL) 727 EVENT_HANDSHAKE_FAILED_AUTH = Event(C.ZMQ_EVENT_HANDSHAKE_FAILED_AUTH) 728) 729 730/* 731Socket event as string. 732*/ 733func (e Event) String() string { 734 if e == EVENT_ALL { 735 return "EVENT_ALL" 736 } 737 ee := make([]string, 0) 738 if e&EVENT_CONNECTED != 0 { 739 ee = append(ee, "EVENT_CONNECTED") 740 } 741 if e&EVENT_CONNECT_DELAYED != 0 { 742 ee = append(ee, "EVENT_CONNECT_DELAYED") 743 } 744 if e&EVENT_CONNECT_RETRIED != 0 { 745 ee = append(ee, "EVENT_CONNECT_RETRIED") 746 } 747 if e&EVENT_LISTENING != 0 { 748 ee = append(ee, "EVENT_LISTENING") 749 } 750 if e&EVENT_BIND_FAILED != 0 { 751 ee = append(ee, "EVENT_BIND_FAILED") 752 } 753 if e&EVENT_ACCEPTED != 0 { 754 ee = append(ee, "EVENT_ACCEPTED") 755 } 756 if e&EVENT_ACCEPT_FAILED != 0 { 757 ee = append(ee, "EVENT_ACCEPT_FAILED") 758 } 759 if e&EVENT_CLOSED != 0 { 760 ee = append(ee, "EVENT_CLOSED") 761 } 762 if e&EVENT_CLOSE_FAILED != 0 { 763 ee = append(ee, "EVENT_CLOSE_FAILED") 764 } 765 if e&EVENT_DISCONNECTED != 0 { 766 ee = append(ee, "EVENT_DISCONNECTED") 767 } 768 if minor >= 3 { 769 if e&EVENT_HANDSHAKE_FAILED_NO_DETAIL != 0 { 770 ee = append(ee, "EVENT_HANDSHAKE_FAILED_NO_DETAIL") 771 } 772 if e&EVENT_HANDSHAKE_SUCCEEDED != 0 { 773 ee = append(ee, "EVENT_HANDSHAKE_SUCCEEDED") 774 } 775 if e&EVENT_HANDSHAKE_FAILED_PROTOCOL != 0 { 776 ee = append(ee, "EVENT_HANDSHAKE_FAILED_PROTOCOL") 777 } 778 if e&EVENT_HANDSHAKE_FAILED_AUTH != 0 { 779 ee = append(ee, "EVENT_HANDSHAKE_FAILED_AUTH") 780 } 781 } 782 if len(ee) == 0 { 783 return "<NONE>" 784 } 785 return strings.Join(ee, "|") 786} 787 788// Used by (soc *Socket)GetEvents() 789type State int 790 791const ( 792 // Flags for (*Socket)GetEvents() 793 // See: http://api.zeromq.org/4-1:zmq-getsockopt#toc8 794 POLLIN = State(C.ZMQ_POLLIN) 795 POLLOUT = State(C.ZMQ_POLLOUT) 796) 797 798/* 799Socket state as string. 800*/ 801func (s State) String() string { 802 ss := make([]string, 0) 803 if s&POLLIN != 0 { 804 ss = append(ss, "POLLIN") 805 } 806 if s&POLLOUT != 0 { 807 ss = append(ss, "POLLOUT") 808 } 809 if len(ss) == 0 { 810 return "<NONE>" 811 } 812 return strings.Join(ss, "|") 813} 814 815// Specifies the security mechanism, used by (*Socket)GetMechanism() 816type Mechanism int 817 818const ( 819 // Constants for (*Socket)GetMechanism() 820 // See: http://api.zeromq.org/4-1:zmq-getsockopt#toc22 821 NULL = Mechanism(C.ZMQ_NULL) 822 PLAIN = Mechanism(C.ZMQ_PLAIN) 823 CURVE = Mechanism(C.ZMQ_CURVE) 824 GSSAPI = Mechanism(C.ZMQ_GSSAPI) 825) 826 827/* 828Security mechanism as string. 829*/ 830func (m Mechanism) String() string { 831 switch m { 832 case NULL: 833 return "NULL" 834 case PLAIN: 835 return "PLAIN" 836 case CURVE: 837 return "CURVE" 838 case GSSAPI: 839 return "GSSAPI" 840 } 841 return "<INVALID>" 842} 843 844/* 845Socket functions starting with `Set` or `Get` are used for setting and 846getting socket options. 847*/ 848type Socket struct { 849 soc unsafe.Pointer 850 ctx *Context 851 opened bool 852 err error 853} 854 855/* 856Socket as string. 857*/ 858func (soc Socket) String() string { 859 if !soc.opened { 860 return "Socket(CLOSED)" 861 } 862 t, err := soc.GetType() 863 if err != nil { 864 return fmt.Sprintf("Socket(%v)", err) 865 } 866 i, err := soc.GetIdentity() 867 if err == nil && i != "" { 868 return fmt.Sprintf("Socket(%v,%q)", t, i) 869 } 870 return fmt.Sprintf("Socket(%v,%p)", t, soc.soc) 871} 872 873/* 874Create 0MQ socket in the default context. 875 876WARNING: 877The Socket is not thread safe. This means that you cannot access the same Socket 878from different goroutines without using something like a mutex. 879 880For a description of socket types, see: http://api.zeromq.org/4-1:zmq-socket#toc3 881*/ 882func NewSocket(t Type) (soc *Socket, err error) { 883 if initVersionError != nil { 884 return nil, initVersionError 885 } 886 if initContextError != nil { 887 return nil, initContextError 888 } 889 return defaultCtx.NewSocket(t) 890} 891 892/* 893Create 0MQ socket in the given context. 894 895WARNING: 896The Socket is not thread safe. This means that you cannot access the same Socket 897from different goroutines without using something like a mutex. 898 899For a description of socket types, see: http://api.zeromq.org/4-1:zmq-socket#toc3 900*/ 901func (ctx *Context) NewSocket(t Type) (soc *Socket, err error) { 902 soc = &Socket{} 903 if !ctx.opened { 904 return soc, ErrorContextClosed 905 } 906 var s unsafe.Pointer 907 var e error 908 for { 909 s, e = C.zmq4_socket(ctx.ctx, C.int(t)) 910 if s != nil || !ctx.retry(e) { 911 break 912 } 913 } 914 if s == nil { 915 err = errget(e) 916 soc.err = err 917 } else { 918 soc.soc = s 919 soc.ctx = ctx 920 soc.opened = true 921 runtime.SetFinalizer(soc, (*Socket).Close) 922 } 923 return 924} 925 926// If not called explicitly, the socket will be closed on garbage collection 927func (soc *Socket) Close() error { 928 if soc.opened { 929 soc.opened = false 930 var i C.int 931 var err error 932 for { 933 i, err = C.zmq4_close(soc.soc) 934 if i == 0 || !soc.ctx.retry(err) { 935 break 936 } 937 } 938 if int(i) != 0 { 939 soc.err = errget(err) 940 } 941 soc.soc = unsafe.Pointer(nil) 942 soc.ctx = nil 943 } 944 return soc.err 945} 946 947// Return the context associated with a socket 948func (soc *Socket) Context() (*Context, error) { 949 if !soc.opened { 950 return nil, ErrorSocketClosed 951 } 952 return soc.ctx, nil 953} 954 955/* 956Accept incoming connections on a socket. 957 958For a description of endpoint, see: http://api.zeromq.org/4-1:zmq-bind#toc2 959*/ 960func (soc *Socket) Bind(endpoint string) error { 961 if !soc.opened { 962 return ErrorSocketClosed 963 } 964 s := C.CString(endpoint) 965 defer C.free(unsafe.Pointer(s)) 966 var i C.int 967 var err error 968 for { 969 i, err = C.zmq4_bind(soc.soc, s) 970 if i == 0 || !soc.ctx.retry(err) { 971 break 972 } 973 } 974 if int(i) != 0 { 975 return errget(err) 976 } 977 return nil 978} 979 980/* 981Stop accepting connections on a socket. 982 983For a description of endpoint, see: http://api.zeromq.org/4-1:zmq-bind#toc2 984*/ 985func (soc *Socket) Unbind(endpoint string) error { 986 if !soc.opened { 987 return ErrorSocketClosed 988 } 989 s := C.CString(endpoint) 990 defer C.free(unsafe.Pointer(s)) 991 var i C.int 992 var err error 993 for { 994 i, err = C.zmq4_unbind(soc.soc, s) 995 if i == 0 || !soc.ctx.retry(err) { 996 break 997 } 998 } 999 if int(i) != 0 { 1000 return errget(err) 1001 } 1002 return nil 1003} 1004 1005/* 1006Create outgoing connection from socket. 1007 1008For a description of endpoint, see: http://api.zeromq.org/4-1:zmq-connect#toc2 1009*/ 1010func (soc *Socket) Connect(endpoint string) error { 1011 if !soc.opened { 1012 return ErrorSocketClosed 1013 } 1014 s := C.CString(endpoint) 1015 defer C.free(unsafe.Pointer(s)) 1016 var i C.int 1017 var err error 1018 for { 1019 i, err = C.zmq4_connect(soc.soc, s) 1020 if i == 0 || !soc.ctx.retry(err) { 1021 break 1022 } 1023 } 1024 if int(i) != 0 { 1025 return errget(err) 1026 } 1027 return nil 1028} 1029 1030/* 1031Disconnect a socket. 1032 1033For a description of endpoint, see: http://api.zeromq.org/4-1:zmq-disconnect#toc2 1034*/ 1035func (soc *Socket) Disconnect(endpoint string) error { 1036 if !soc.opened { 1037 return ErrorSocketClosed 1038 } 1039 s := C.CString(endpoint) 1040 defer C.free(unsafe.Pointer(s)) 1041 var i C.int 1042 var err error 1043 for { 1044 i, err = C.zmq4_disconnect(soc.soc, s) 1045 if i == 0 || !soc.ctx.retry(err) { 1046 break 1047 } 1048 } 1049 if int(i) != 0 { 1050 return errget(err) 1051 } 1052 return nil 1053} 1054 1055/* 1056Receive a message part from a socket. 1057 1058For a description of flags, see: http://api.zeromq.org/4-1:zmq-msg-recv#toc2 1059*/ 1060func (soc *Socket) Recv(flags Flag) (string, error) { 1061 b, err := soc.RecvBytes(flags) 1062 return string(b), err 1063} 1064 1065/* 1066Receive a message part from a socket. 1067 1068For a description of flags, see: http://api.zeromq.org/4-1:zmq-msg-recv#toc2 1069*/ 1070func (soc *Socket) RecvBytes(flags Flag) ([]byte, error) { 1071 if !soc.opened { 1072 return []byte{}, ErrorSocketClosed 1073 } 1074 var msg C.zmq_msg_t 1075 C.zmq_msg_init(&msg) 1076 defer C.zmq_msg_close(&msg) 1077 1078 var size C.int 1079 var err error 1080 for { 1081 size, err = C.zmq4_msg_recv(&msg, soc.soc, C.int(flags)) 1082 if size >= 0 || !soc.ctx.retry(err) { 1083 break 1084 } 1085 } 1086 if size < 0 { 1087 return []byte{}, errget(err) 1088 } 1089 if size == 0 { 1090 return []byte{}, nil 1091 } 1092 data := make([]byte, int(size)) 1093 C.zmq4_memcpy(unsafe.Pointer(&data[0]), C.zmq_msg_data(&msg), C.size_t(size)) 1094 return data, nil 1095} 1096 1097/* 1098Send a message part on a socket. 1099 1100For a description of flags, see: http://api.zeromq.org/4-1:zmq-send#toc2 1101*/ 1102func (soc *Socket) Send(data string, flags Flag) (int, error) { 1103 return soc.SendBytes([]byte(data), flags) 1104} 1105 1106/* 1107Send a message part on a socket. 1108 1109For a description of flags, see: http://api.zeromq.org/4-1:zmq-send#toc2 1110*/ 1111func (soc *Socket) SendBytes(data []byte, flags Flag) (int, error) { 1112 if !soc.opened { 1113 return 0, ErrorSocketClosed 1114 } 1115 d := data 1116 if len(data) == 0 { 1117 d = []byte{0} 1118 } 1119 var size C.int 1120 var err error 1121 for { 1122 size, err = C.zmq4_send(soc.soc, unsafe.Pointer(&d[0]), C.size_t(len(data)), C.int(flags)) 1123 if size >= 0 || !soc.ctx.retry(err) { 1124 break 1125 } 1126 } 1127 if size < 0 { 1128 return int(size), errget(err) 1129 } 1130 return int(size), nil 1131} 1132 1133/* 1134Register a monitoring callback. 1135 1136See: http://api.zeromq.org/4-1:zmq-socket-monitor#toc2 1137 1138WARNING: Closing a context with a monitoring callback will lead to random crashes. 1139This is a bug in the ZeroMQ library. 1140The monitoring callback has the same context as the socket it was created for. 1141 1142Example: 1143 1144 package main 1145 1146 import ( 1147 zmq "github.com/pebbe/zmq4" 1148 "log" 1149 "time" 1150 ) 1151 1152 func rep_socket_monitor(addr string) { 1153 s, err := zmq.NewSocket(zmq.PAIR) 1154 if err != nil { 1155 log.Fatalln(err) 1156 } 1157 err = s.Connect(addr) 1158 if err != nil { 1159 log.Fatalln(err) 1160 } 1161 for { 1162 a, b, c, err := s.RecvEvent(0) 1163 if err != nil { 1164 log.Println(err) 1165 break 1166 } 1167 log.Println(a, b, c) 1168 } 1169 s.Close() 1170 } 1171 1172 func main() { 1173 1174 // REP socket 1175 rep, err := zmq.NewSocket(zmq.REP) 1176 if err != nil { 1177 log.Fatalln(err) 1178 } 1179 1180 // REP socket monitor, all events 1181 err = rep.Monitor("inproc://monitor.rep", zmq.EVENT_ALL) 1182 if err != nil { 1183 log.Fatalln(err) 1184 } 1185 go rep_socket_monitor("inproc://monitor.rep") 1186 1187 // Generate an event 1188 rep.Bind("tcp://*:5555") 1189 if err != nil { 1190 log.Fatalln(err) 1191 } 1192 1193 // Allow some time for event detection 1194 time.Sleep(time.Second) 1195 1196 rep.Close() 1197 zmq.Term() 1198 } 1199*/ 1200func (soc *Socket) Monitor(addr string, events Event) error { 1201 if !soc.opened { 1202 return ErrorSocketClosed 1203 } 1204 if addr == "" { 1205 var i C.int 1206 var err error 1207 for { 1208 i, err = C.zmq4_socket_monitor(soc.soc, nil, C.int(events)) 1209 if i == 0 || !soc.ctx.retry(err) { 1210 break 1211 } 1212 } 1213 if i != 0 { 1214 return errget(err) 1215 } 1216 return nil 1217 } 1218 1219 s := C.CString(addr) 1220 defer C.free(unsafe.Pointer(s)) 1221 var i C.int 1222 var err error 1223 for { 1224 i, err = C.zmq4_socket_monitor(soc.soc, s, C.int(events)) 1225 if i == 0 || !soc.ctx.retry(err) { 1226 break 1227 } 1228 } 1229 if i != 0 { 1230 return errget(err) 1231 } 1232 return nil 1233} 1234 1235/* 1236Receive a message part from a socket interpreted as an event. 1237 1238For a description of flags, see: http://api.zeromq.org/4-1:zmq-msg-recv#toc2 1239 1240For a description of event_type, see: http://api.zeromq.org/4-1:zmq-socket-monitor#toc3 1241 1242For an example, see: func (*Socket) Monitor 1243*/ 1244func (soc *Socket) RecvEvent(flags Flag) (event_type Event, addr string, value int, err error) { 1245 if !soc.opened { 1246 return EVENT_ALL, "", 0, ErrorSocketClosed 1247 } 1248 var msg C.zmq_msg_t 1249 C.zmq_msg_init(&msg) 1250 defer C.zmq_msg_close(&msg) 1251 var size C.int 1252 var e error 1253 for { 1254 size, e = C.zmq4_msg_recv(&msg, soc.soc, C.int(flags)) 1255 if size >= 0 || !soc.ctx.retry(e) { 1256 break 1257 } 1258 } 1259 if size < 0 { 1260 err = errget(e) 1261 return 1262 } 1263 et := C.int(0) 1264 val := C.int(0) 1265 1266 if minor == 0 { 1267 C.zmq4_get_event40(&msg, &et, &val) 1268 } else { 1269 C.zmq4_get_event41(&msg, &et, &val) 1270 } 1271 more, e := soc.GetRcvmore() 1272 if e != nil { 1273 err = errget(e) 1274 return 1275 } 1276 if !more { 1277 err = ErrorMoreExpected 1278 return 1279 } 1280 addr, e = soc.Recv(flags) 1281 if e != nil { 1282 err = errget(e) 1283 return 1284 } 1285 1286 event_type = Event(et) 1287 value = int(val) 1288 1289 return 1290} 1291 1292/* 1293Start built-in ØMQ proxy 1294 1295See: http://api.zeromq.org/4-1:zmq-proxy#toc2 1296*/ 1297func Proxy(frontend, backend, capture *Socket) error { 1298 if !(frontend.opened && backend.opened && (capture == nil || capture.opened)) { 1299 return ErrorSocketClosed 1300 } 1301 var capt unsafe.Pointer 1302 if capture != nil { 1303 capt = capture.soc 1304 } 1305 var err error 1306 for { 1307 _, err = C.zmq4_proxy(frontend.soc, backend.soc, capt) 1308 if !frontend.ctx.retry(err) { 1309 break 1310 } 1311 } 1312 return errget(err) 1313} 1314 1315/* 1316Start built-in ØMQ proxy with PAUSE/RESUME/TERMINATE control flow 1317 1318Returns ErrorNotImplemented405 with ZeroMQ version < 4.0.5 1319 1320See: http://api.zeromq.org/4-1:zmq-proxy-steerable#toc2 1321*/ 1322func ProxySteerable(frontend, backend, capture, control *Socket) error { 1323 if minor == 0 && patch < 5 { 1324 return ErrorNotImplemented405 1325 } 1326 if !(frontend.opened && backend.opened && (capture == nil || capture.opened) && (control == nil || control.opened)) { 1327 return ErrorSocketClosed 1328 } 1329 var capt, ctrl unsafe.Pointer 1330 if capture != nil { 1331 capt = capture.soc 1332 } 1333 if control != nil { 1334 ctrl = control.soc 1335 } 1336 var i C.int 1337 var err error 1338 for { 1339 i, err = C.zmq4_proxy_steerable(frontend.soc, backend.soc, capt, ctrl) 1340 if i >= 0 || !frontend.ctx.retry(err) { 1341 break 1342 } 1343 } 1344 if i < 0 { 1345 return errget(err) 1346 } 1347 return nil 1348} 1349 1350//. CURVE 1351 1352/* 1353Encode a binary key as Z85 printable text 1354 1355See: http://api.zeromq.org/4-1:zmq-z85-encode 1356*/ 1357func Z85encode(data string) string { 1358 if initVersionError != nil { 1359 return initVersionError.Error() 1360 } 1361 l1 := len(data) 1362 if l1%4 != 0 { 1363 panic("Z85encode: Length of data not a multiple of 4") 1364 } 1365 d := []byte(data) 1366 1367 l2 := 5 * l1 / 4 1368 dest := make([]byte, l2+1) 1369 1370 C.zmq_z85_encode((*C.char)(unsafe.Pointer(&dest[0])), (*C.uint8_t)(&d[0]), C.size_t(l1)) 1371 1372 return string(dest[:l2]) 1373} 1374 1375/* 1376Decode a binary key from Z85 printable text 1377 1378See: http://api.zeromq.org/4-1:zmq-z85-decode 1379*/ 1380func Z85decode(s string) string { 1381 if initVersionError != nil { 1382 return initVersionError.Error() 1383 } 1384 l1 := len(s) 1385 if l1%5 != 0 { 1386 panic("Z85decode: Length of Z85 string not a multiple of 5") 1387 } 1388 l2 := 4 * l1 / 5 1389 dest := make([]byte, l2) 1390 cs := C.CString(s) 1391 defer C.free(unsafe.Pointer(cs)) 1392 C.zmq_z85_decode((*C.uint8_t)(&dest[0]), cs) 1393 return string(dest) 1394} 1395 1396/* 1397Generate a new CURVE keypair 1398 1399See: http://api.zeromq.org/4-1:zmq-curve-keypair#toc2 1400*/ 1401func NewCurveKeypair() (z85_public_key, z85_secret_key string, err error) { 1402 if initVersionError != nil { 1403 return "", "", initVersionError 1404 } 1405 var pubkey, seckey [41]byte 1406 if i, err := C.zmq4_curve_keypair((*C.char)(unsafe.Pointer(&pubkey[0])), (*C.char)(unsafe.Pointer(&seckey[0]))); i != 0 { 1407 return "", "", errget(err) 1408 } 1409 return string(pubkey[:40]), string(seckey[:40]), nil 1410} 1411 1412/* 1413Receive a message part with metadata. 1414 1415This requires ZeroMQ version 4.1.0. Lower versions will return the message part without metadata. 1416 1417The returned metadata map contains only those properties that exist on the message. 1418 1419For a description of flags, see: http://api.zeromq.org/4-1:zmq-msg-recv#toc2 1420 1421For a description of metadata, see: http://api.zeromq.org/4-1:zmq-msg-gets#toc3 1422*/ 1423func (soc *Socket) RecvWithMetadata(flags Flag, properties ...string) (msg string, metadata map[string]string, err error) { 1424 b, p, err := soc.RecvBytesWithMetadata(flags, properties...) 1425 return string(b), p, err 1426} 1427 1428/* 1429Receive a message part with metadata. 1430 1431This requires ZeroMQ version 4.1.0. Lower versions will return the message part without metadata. 1432 1433The returned metadata map contains only those properties that exist on the message. 1434 1435For a description of flags, see: http://api.zeromq.org/4-1:zmq-msg-recv#toc2 1436 1437For a description of metadata, see: http://api.zeromq.org/4-1:zmq-msg-gets#toc3 1438*/ 1439func (soc *Socket) RecvBytesWithMetadata(flags Flag, properties ...string) (msg []byte, metadata map[string]string, err error) { 1440 if !soc.opened { 1441 return []byte{}, map[string]string{}, ErrorSocketClosed 1442 } 1443 1444 metadata = make(map[string]string) 1445 1446 var m C.zmq_msg_t 1447 C.zmq_msg_init(&m) 1448 defer C.zmq_msg_close(&m) 1449 1450 var size C.int 1451 for { 1452 size, err = C.zmq4_msg_recv(&m, soc.soc, C.int(flags)) 1453 if size >= 0 || !soc.ctx.retry(err) { 1454 break 1455 } 1456 } 1457 if size < 0 { 1458 return []byte{}, metadata, errget(err) 1459 } 1460 1461 data := make([]byte, int(size)) 1462 if size > 0 { 1463 C.zmq4_memcpy(unsafe.Pointer(&data[0]), C.zmq_msg_data(&m), C.size_t(size)) 1464 } 1465 1466 if minor > 0 { 1467 for _, p := range properties { 1468 ps := C.CString(p) 1469 s := C.zmq4_msg_gets(&m, ps) 1470 if s != nil { 1471 metadata[p] = C.GoString(s) 1472 } 1473 C.free(unsafe.Pointer(ps)) 1474 } 1475 } 1476 return data, metadata, nil 1477} 1478 1479func hasCap(s string) (value bool) { 1480 if initVersionError != nil { 1481 return false 1482 } 1483 if minor < 1 { 1484 return false 1485 } 1486 cs := C.CString(s) 1487 defer C.free(unsafe.Pointer(cs)) 1488 return C.zmq_has(cs) != 0 1489} 1490 1491// Returns false for ZeroMQ version < 4.1.0 1492// 1493// Else: returns true if the library supports the ipc:// protocol 1494func HasIpc() bool { 1495 return hasCap("ipc") 1496} 1497 1498// Returns false for ZeroMQ version < 4.1.0 1499// 1500// Else: returns true if the library supports the pgm:// protocol 1501func HasPgm() bool { 1502 return hasCap("pgm") 1503} 1504 1505// Returns false for ZeroMQ version < 4.1.0 1506// 1507// Else: returns true if the library supports the tipc:// protocol 1508func HasTipc() bool { 1509 return hasCap("tipc") 1510} 1511 1512// Returns false for ZeroMQ version < 4.1.0 1513// 1514// Else: returns true if the library supports the norm:// protocol 1515func HasNorm() bool { 1516 return hasCap("norm") 1517} 1518 1519// Returns false for ZeroMQ version < 4.1.0 1520// 1521// Else: returns true if the library supports the CURVE security mechanism 1522func HasCurve() bool { 1523 return hasCap("curve") 1524} 1525 1526// Returns false for ZeroMQ version < 4.1.0 1527// 1528// Else: returns true if the library supports the GSSAPI security mechanism 1529func HasGssapi() bool { 1530 return hasCap("gssapi") 1531} 1532