1# 2# Module Generated by Template::Tiny on Thu Mar 14 04:05:17 UTC 2019 3# 4 5package ZMQ::FFI::ZMQ4_1::Socket; 6$ZMQ::FFI::ZMQ4_1::Socket::VERSION = '1.17'; 7use FFI::Platypus; 8use FFI::Platypus::Buffer; 9use FFI::Platypus::Memory qw(malloc free memcpy); 10 11use Carp qw(croak carp); 12use Try::Tiny; 13 14use ZMQ::FFI::ZMQ4_1::Raw; 15use ZMQ::FFI::Custom::Raw; 16use ZMQ::FFI::Constants qw(:all); 17use ZMQ::FFI::Util qw(current_tid); 18 19use Moo; 20use namespace::clean; 21 22no if $] >= 5.018, warnings => "experimental"; 23use feature 'switch'; 24 25with qw( 26 ZMQ::FFI::SocketRole 27 ZMQ::FFI::ErrorHelper 28 ZMQ::FFI::Versioner 29); 30 31my $FFI_LOADED; 32 33sub BUILD { 34 my ($self) = @_; 35 36 unless ($FFI_LOADED) { 37 ZMQ::FFI::Custom::Raw::load($self->soname); 38 ZMQ::FFI::ZMQ4_1::Raw::load($self->soname); 39 $FFI_LOADED = 1; 40 } 41 42 # force init zmq_msg_t 43 $self->_zmq_msg_t; 44 45 # ensure clean edge state 46 while ( $self->has_pollin ) { 47 $self->recv(); 48 } 49 50 # set default linger 51 $self->set_linger(0); 52} 53 54 55sub connect { 56 my ($self, $endpoint) = @_; 57 58 if ($_[0]->socket_ptr == -1) { 59 carp "Operation on closed socket"; 60 return; 61 } 62 63 unless ($endpoint) { 64 croak 'usage: $socket->connect($endpoint)'; 65 } 66 67 $self->check_error( 68 'zmq_connect', 69 zmq_connect($self->socket_ptr, $endpoint) 70 ); 71} 72 73sub disconnect { 74 my ($self, $endpoint) = @_; 75 76 if ($_[0]->socket_ptr == -1) { 77 carp "Operation on closed socket"; 78 return; 79 } 80 81 unless ($endpoint) { 82 croak 'usage: $socket->disconnect($endpoint)'; 83 } 84 85 $self->check_error( 86 'zmq_disconnect', 87 zmq_disconnect($self->socket_ptr, $endpoint) 88 ); 89} 90 91sub bind { 92 my ($self, $endpoint) = @_; 93 94 if ($_[0]->socket_ptr == -1) { 95 carp "Operation on closed socket"; 96 return; 97 } 98 99 unless ($endpoint) { 100 croak 'usage: $socket->bind($endpoint)' 101 } 102 103 $self->check_error( 104 'zmq_bind', 105 zmq_bind($self->socket_ptr, $endpoint) 106 ); 107} 108 109sub unbind { 110 my ($self, $endpoint) = @_; 111 112 if ($_[0]->socket_ptr == -1) { 113 carp "Operation on closed socket"; 114 return; 115 } 116 117 unless ($endpoint) { 118 croak 'usage: $socket->unbind($endpoint)'; 119 } 120 121 $self->check_error( 122 'zmq_unbind', 123 zmq_unbind($self->socket_ptr, $endpoint) 124 ); 125} 126 127sub send { 128 # 0: self 129 # 1: data 130 # 2: flags 131 132 if ($_[0]->socket_ptr == -1) { 133 carp "Operation on closed socket"; 134 return; 135 } 136 137 $_[0]->{last_errno} = 0; 138 139 use bytes; 140 my $length = length($_[1]); 141 no bytes; 142 143 if ( -1 == zmq_send($_[0]->socket_ptr, $_[1], $length, ($_[2] // 0)) ) { 144 $_[0]->{last_errno} = zmq_errno(); 145 146 if ($_[0]->die_on_error) { 147 $_[0]->fatal('zmq_send'); 148 } 149 150 return; 151 } 152} 153 154sub send_multipart { 155 # 0: self 156 # 1: partsref 157 # 2: flags 158 159 if ($_[0]->socket_ptr == -1) { 160 carp "Operation on closed socket"; 161 return; 162 } 163 164 my @parts = @{$_[1] // []}; 165 unless (@parts) { 166 croak 'usage: send_multipart($parts, $flags)'; 167 } 168 169 for my $i (0..$#parts-1) { 170 $_[0]->send($parts[$i], ($_[2] // 0) | ZMQ_SNDMORE); 171 172 # don't need to explicitly check die_on_error 173 # since send would have exploded if it was true 174 if ($_[0]->has_error) { 175 return; 176 } 177 } 178 179 $_[0]->send($parts[$#parts], $_[2] // 0); 180} 181 182sub recv { 183 # 0: self 184 # 1: flags 185 186 if ($_[0]->socket_ptr == -1) { 187 carp "Operation on closed socket"; 188 return; 189 } 190 191 $_[0]->{last_errno} = 0; 192 193 # retval = msg size 194 my $retval = zmq_msg_recv($_[0]->{"_zmq_msg_t"}, $_[0]->socket_ptr, $_[1] // 0); 195 196 if ( $retval == -1 ) { 197 $_[0]->{last_errno} = zmq_errno(); 198 199 if ($_[0]->die_on_error) { 200 $_[0]->fatal('zmq_msg_recv'); 201 } 202 203 204 return; 205 } 206 207 if ($retval) { 208 return buffer_to_scalar(zmq_msg_data($_[0]->{"_zmq_msg_t"}), $retval); 209 } 210 211 return ''; 212} 213 214sub recv_multipart { 215 # 0: self 216 # 1: flags 217 218 if ($_[0]->socket_ptr == -1) { 219 carp "Operation on closed socket"; 220 return; 221 } 222 223 my @parts = ( $_[0]->recv($_[1]) ); 224 225 if ($_[0]->has_error) { 226 return; 227 } 228 229 my $type = ($_[0]->version)[0] == 2 ? 'int64_t' : 'int'; 230 231 while ( $_[0]->get(ZMQ_RCVMORE, $type) ){ 232 push @parts, $_[0]->recv($_[1] // 0); 233 234 # don't need to explicitly check die_on_error 235 # since recv would have exploded if it was true 236 if ($_[0]->has_error) { 237 return; 238 } 239 } 240 241 return @parts; 242} 243 244sub get_fd { 245 if ($_[0]->socket_ptr == -1) { 246 carp "Operation on closed socket"; 247 return; 248 } 249 250 return $_[0]->get(ZMQ_FD, 'int'); 251} 252 253sub get_linger { 254 if ($_[0]->socket_ptr == -1) { 255 carp "Operation on closed socket"; 256 return; 257 } 258 259 return $_[0]->get(ZMQ_LINGER, 'int'); 260} 261 262sub set_linger { 263 my ($self, $linger) = @_; 264 265 if ($_[0]->socket_ptr == -1) { 266 carp "Operation on closed socket"; 267 return; 268 } 269 270 $self->set(ZMQ_LINGER, 'int', $linger); 271} 272 273sub get_identity { 274 if ($_[0]->socket_ptr == -1) { 275 carp "Operation on closed socket"; 276 return; 277 } 278 279 return $_[0]->get(ZMQ_IDENTITY, 'binary'); 280} 281 282sub set_identity { 283 my ($self, $id) = @_; 284 285 if ($_[0]->socket_ptr == -1) { 286 carp "Operation on closed socket"; 287 return; 288 } 289 290 $self->set(ZMQ_IDENTITY, 'binary', $id); 291} 292 293sub subscribe { 294 my ($self, $topic) = @_; 295 296 if ($_[0]->socket_ptr == -1) { 297 carp "Operation on closed socket"; 298 return; 299 } 300 301 $self->set(ZMQ_SUBSCRIBE, 'binary', $topic); 302} 303 304sub unsubscribe { 305 my ($self, $topic) = @_; 306 307 if ($_[0]->socket_ptr == -1) { 308 carp "Operation on closed socket"; 309 return; 310 } 311 312 $self->set(ZMQ_UNSUBSCRIBE, 'binary', $topic); 313} 314 315sub has_pollin { 316 if ($_[0]->socket_ptr == -1) { 317 carp "Operation on closed socket"; 318 return; 319 } 320 321 return $_[0]->get(ZMQ_EVENTS, 'int') & ZMQ_POLLIN; 322} 323 324sub has_pollout { 325 if ($_[0]->socket_ptr == -1) { 326 carp "Operation on closed socket"; 327 return; 328 } 329 330 return $_[0]->get(ZMQ_EVENTS, 'int') & ZMQ_POLLOUT; 331} 332 333sub get { 334 my ($self, $opt, $opt_type) = @_; 335 336 if ($_[0]->socket_ptr == -1) { 337 carp "Operation on closed socket"; 338 return; 339 } 340 341 my $optval; 342 my $optval_len; 343 344 for ($opt_type) { 345 when (/^(binary|string)$/) { 346 # ZMQ_IDENTITY uses binary type and can be at most 255 bytes long 347 # 348 # ZMQ_LAST_ENDPOINT uses string type and expects a buffer large 349 # enough to hold an endpoint string 350 # 351 # So for these cases 256 should be sufficient (including \0). 352 # Other binary/string opts are being added all the time, and 353 # hopefully this value scales, but we can always increase it if 354 # necessary 355 my $optval_ptr = malloc(256); 356 $optval_len = 256; 357 358 $self->check_error( 359 'zmq_getsockopt', 360 zmq_getsockopt_binary( 361 $self->socket_ptr, 362 $opt, 363 $optval_ptr, 364 \$optval_len 365 ) 366 ); 367 368 if ($self->has_error) { 369 free($optval_ptr); 370 return; 371 } 372 373 if ($opt_type eq 'binary') { 374 $optval = buffer_to_scalar($optval_ptr, $optval_len); 375 free($optval_ptr); 376 } 377 else { # string 378 # FFI::Platypus already appends a null terminating byte for 379 # strings, so strip the one included by zeromq (otherwise test 380 # comparisons fail due to the extra NUL) 381 $optval = buffer_to_scalar($optval_ptr, $optval_len-1); 382 free($optval_ptr); 383 } 384 } 385 386 when ('int') { 387 $optval_len = $self->sockopt_sizes->{'int'}; 388 $self->check_error( 389 'zmq_getsockopt', 390 zmq_getsockopt_int( 391 $self->socket_ptr, 392 $opt, 393 \$optval, 394 \$optval_len 395 ) 396 ); 397 } 398 399 when ('int64_t') { 400 $optval_len = $self->sockopt_sizes->{'sint64'}; 401 $self->check_error( 402 'zmq_getsockopt', 403 zmq_getsockopt_int64( 404 $self->socket_ptr, 405 $opt, 406 \$optval, 407 \$optval_len 408 ) 409 ); 410 } 411 412 when ('uint64_t') { 413 $optval_len = $self->sockopt_sizes->{'uint64'}; 414 $self->check_error( 415 'zmq_getsockopt', 416 zmq_getsockopt_uint64( 417 $self->socket_ptr, 418 $opt, 419 \$optval, 420 \$optval_len 421 ) 422 ); 423 } 424 425 default { 426 croak "unknown type $opt_type"; 427 } 428 } 429 430 if ($optval ne '') { 431 return $optval; 432 } 433 434 return; 435} 436 437sub set { 438 my ($self, $opt, $opt_type, $optval) = @_; 439 440 if ($_[0]->socket_ptr == -1) { 441 carp "Operation on closed socket"; 442 return; 443 } 444 445 for ($opt_type) { 446 when (/^(binary|string)$/) { 447 my ($optval_ptr, $optval_len) = scalar_to_buffer($optval); 448 $self->check_error( 449 'zmq_setsockopt', 450 zmq_setsockopt_binary( 451 $self->socket_ptr, 452 $opt, 453 $optval_ptr, 454 $optval_len 455 ) 456 ); 457 } 458 459 when ('int') { 460 $self->check_error( 461 'zmq_setsockopt', 462 zmq_setsockopt_int( 463 $self->socket_ptr, 464 $opt, 465 \$optval, 466 $self->sockopt_sizes->{'int'} 467 ) 468 ); 469 } 470 471 when ('int64_t') { 472 $self->check_error( 473 'zmq_setsockopt', 474 zmq_setsockopt_int64( 475 $self->socket_ptr, 476 $opt, 477 \$optval, 478 $self->sockopt_sizes->{'sint64'} 479 ) 480 ); 481 } 482 483 when ('uint64_t') { 484 $self->check_error( 485 'zmq_setsockopt', 486 zmq_setsockopt_uint64( 487 $self->socket_ptr, 488 $opt, 489 \$optval, 490 $self->sockopt_sizes->{'uint64'} 491 ) 492 ); 493 } 494 495 default { 496 croak "unknown type $opt_type"; 497 } 498 } 499 500 return; 501} 502 503sub close { 504 my ($self) = @_; 505 506 if ($_[0]->socket_ptr == -1) { 507 carp "Operation on closed socket"; 508 return; 509 } 510 511 # don't try to cleanup socket cloned from another thread 512 return unless $self->_tid == current_tid(); 513 514 # don't try to cleanup socket copied from another process (fork) 515 return unless $self->_pid == $$; 516 517 $self->check_error( 518 'zmq_msg_close', 519 zmq_msg_close($self->_zmq_msg_t) 520 ); 521 522 $self->check_error( 523 'zmq_close', 524 zmq_close($self->socket_ptr) 525 ); 526 527 $self->socket_ptr(-1); 528} 529 530 531sub DEMOLISH { 532 my ($self) = @_; 533 534 # remove ourselves from the context object so that we dont leak 535 $self->context->_remove_socket($self) if (defined $self->context); 536 537 return if $self->socket_ptr == -1; 538 539 $self->close(); 540} 541 5421; 543 544# vim:ft=perl 545 546__END__ 547 548=pod 549 550=encoding UTF-8 551 552=head1 NAME 553 554ZMQ::FFI::ZMQ4_1::Socket 555 556=head1 VERSION 557 558version 1.17 559 560=head1 AUTHOR 561 562Dylan Cali <calid1984@gmail.com> 563 564=head1 COPYRIGHT AND LICENSE 565 566This software is copyright (c) 2019 by Dylan Cali. 567 568This is free software; you can redistribute it and/or modify it under 569the same terms as the Perl 5 programming language system itself. 570 571=cut 572