1#
2# Module Generated by Template::Tiny on Thu Mar 14 04:05:17 UTC 2019
3#
4
5package ZMQ::FFI::ZMQ4_1::Socket;
6$ZMQ::FFI::ZMQ4_1::Socket::VERSION = '1.17';
7use FFI::Platypus;
8use FFI::Platypus::Buffer;
9use FFI::Platypus::Memory qw(malloc free memcpy);
10
11use Carp qw(croak carp);
12use Try::Tiny;
13
14use ZMQ::FFI::ZMQ4_1::Raw;
15use ZMQ::FFI::Custom::Raw;
16use ZMQ::FFI::Constants qw(:all);
17use ZMQ::FFI::Util qw(current_tid);
18
19use Moo;
20use namespace::clean;
21
22no if $] >= 5.018, warnings => "experimental";
23use feature 'switch';
24
25with qw(
26    ZMQ::FFI::SocketRole
27    ZMQ::FFI::ErrorHelper
28    ZMQ::FFI::Versioner
29);
30
31my $FFI_LOADED;
32
33sub BUILD {
34    my ($self) = @_;
35
36    unless ($FFI_LOADED) {
37        ZMQ::FFI::Custom::Raw::load($self->soname);
38        ZMQ::FFI::ZMQ4_1::Raw::load($self->soname);
39        $FFI_LOADED = 1;
40    }
41
42    # force init zmq_msg_t
43    $self->_zmq_msg_t;
44
45    # ensure clean edge state
46    while ( $self->has_pollin ) {
47        $self->recv();
48    }
49
50    # set default linger
51    $self->set_linger(0);
52}
53
54
55sub connect {
56    my ($self, $endpoint) = @_;
57
58    if ($_[0]->socket_ptr == -1) {
59        carp "Operation on closed socket";
60        return;
61    }
62
63    unless ($endpoint) {
64        croak 'usage: $socket->connect($endpoint)';
65    }
66
67    $self->check_error(
68        'zmq_connect',
69        zmq_connect($self->socket_ptr, $endpoint)
70    );
71}
72
73sub disconnect {
74    my ($self, $endpoint) = @_;
75
76    if ($_[0]->socket_ptr == -1) {
77        carp "Operation on closed socket";
78        return;
79    }
80
81    unless ($endpoint) {
82        croak 'usage: $socket->disconnect($endpoint)';
83    }
84
85    $self->check_error(
86        'zmq_disconnect',
87        zmq_disconnect($self->socket_ptr, $endpoint)
88    );
89}
90
91sub bind {
92    my ($self, $endpoint) = @_;
93
94    if ($_[0]->socket_ptr == -1) {
95        carp "Operation on closed socket";
96        return;
97    }
98
99    unless ($endpoint) {
100        croak 'usage: $socket->bind($endpoint)'
101    }
102
103    $self->check_error(
104        'zmq_bind',
105        zmq_bind($self->socket_ptr, $endpoint)
106    );
107}
108
109sub unbind {
110    my ($self, $endpoint) = @_;
111
112    if ($_[0]->socket_ptr == -1) {
113        carp "Operation on closed socket";
114        return;
115    }
116
117    unless ($endpoint) {
118        croak 'usage: $socket->unbind($endpoint)';
119    }
120
121    $self->check_error(
122        'zmq_unbind',
123        zmq_unbind($self->socket_ptr, $endpoint)
124    );
125}
126
127sub send {
128    # 0: self
129    # 1: data
130    # 2: flags
131
132    if ($_[0]->socket_ptr == -1) {
133        carp "Operation on closed socket";
134        return;
135    }
136
137    $_[0]->{last_errno} = 0;
138
139    use bytes;
140    my $length = length($_[1]);
141    no bytes;
142
143    if ( -1 == zmq_send($_[0]->socket_ptr, $_[1], $length, ($_[2] // 0)) ) {
144        $_[0]->{last_errno} = zmq_errno();
145
146        if ($_[0]->die_on_error) {
147            $_[0]->fatal('zmq_send');
148        }
149
150        return;
151    }
152}
153
154sub send_multipart {
155    # 0: self
156    # 1: partsref
157    # 2: flags
158
159    if ($_[0]->socket_ptr == -1) {
160        carp "Operation on closed socket";
161        return;
162    }
163
164    my @parts = @{$_[1] // []};
165    unless (@parts) {
166        croak 'usage: send_multipart($parts, $flags)';
167    }
168
169    for my $i (0..$#parts-1) {
170        $_[0]->send($parts[$i], ($_[2] // 0) | ZMQ_SNDMORE);
171
172        # don't need to explicitly check die_on_error
173        # since send would have exploded if it was true
174        if ($_[0]->has_error) {
175            return;
176        }
177    }
178
179    $_[0]->send($parts[$#parts], $_[2] // 0);
180}
181
182sub recv {
183    # 0: self
184    # 1: flags
185
186    if ($_[0]->socket_ptr == -1) {
187        carp "Operation on closed socket";
188        return;
189    }
190
191    $_[0]->{last_errno} = 0;
192
193    # retval = msg size
194    my $retval = zmq_msg_recv($_[0]->{"_zmq_msg_t"}, $_[0]->socket_ptr, $_[1] // 0);
195
196    if ( $retval == -1 ) {
197        $_[0]->{last_errno} = zmq_errno();
198
199        if ($_[0]->die_on_error) {
200            $_[0]->fatal('zmq_msg_recv');
201        }
202
203
204        return;
205    }
206
207    if ($retval) {
208        return buffer_to_scalar(zmq_msg_data($_[0]->{"_zmq_msg_t"}), $retval);
209    }
210
211    return '';
212}
213
214sub recv_multipart {
215    # 0: self
216    # 1: flags
217
218    if ($_[0]->socket_ptr == -1) {
219        carp "Operation on closed socket";
220        return;
221    }
222
223    my @parts = ( $_[0]->recv($_[1]) );
224
225    if ($_[0]->has_error) {
226        return;
227    }
228
229    my $type = ($_[0]->version)[0] == 2 ? 'int64_t' : 'int';
230
231    while ( $_[0]->get(ZMQ_RCVMORE, $type) ){
232        push @parts, $_[0]->recv($_[1] // 0);
233
234        # don't need to explicitly check die_on_error
235        # since recv would have exploded if it was true
236        if ($_[0]->has_error) {
237            return;
238        }
239    }
240
241    return @parts;
242}
243
244sub get_fd {
245    if ($_[0]->socket_ptr == -1) {
246        carp "Operation on closed socket";
247        return;
248    }
249
250    return $_[0]->get(ZMQ_FD, 'int');
251}
252
253sub get_linger {
254    if ($_[0]->socket_ptr == -1) {
255        carp "Operation on closed socket";
256        return;
257    }
258
259    return $_[0]->get(ZMQ_LINGER, 'int');
260}
261
262sub set_linger {
263    my ($self, $linger) = @_;
264
265    if ($_[0]->socket_ptr == -1) {
266        carp "Operation on closed socket";
267        return;
268    }
269
270    $self->set(ZMQ_LINGER, 'int', $linger);
271}
272
273sub get_identity {
274    if ($_[0]->socket_ptr == -1) {
275        carp "Operation on closed socket";
276        return;
277    }
278
279    return $_[0]->get(ZMQ_IDENTITY, 'binary');
280}
281
282sub set_identity {
283    my ($self, $id) = @_;
284
285    if ($_[0]->socket_ptr == -1) {
286        carp "Operation on closed socket";
287        return;
288    }
289
290    $self->set(ZMQ_IDENTITY, 'binary', $id);
291}
292
293sub subscribe {
294    my ($self, $topic) = @_;
295
296    if ($_[0]->socket_ptr == -1) {
297        carp "Operation on closed socket";
298        return;
299    }
300
301    $self->set(ZMQ_SUBSCRIBE, 'binary', $topic);
302}
303
304sub unsubscribe {
305    my ($self, $topic) = @_;
306
307    if ($_[0]->socket_ptr == -1) {
308        carp "Operation on closed socket";
309        return;
310    }
311
312    $self->set(ZMQ_UNSUBSCRIBE, 'binary', $topic);
313}
314
315sub has_pollin {
316    if ($_[0]->socket_ptr == -1) {
317        carp "Operation on closed socket";
318        return;
319    }
320
321    return $_[0]->get(ZMQ_EVENTS, 'int') & ZMQ_POLLIN;
322}
323
324sub has_pollout {
325    if ($_[0]->socket_ptr == -1) {
326        carp "Operation on closed socket";
327        return;
328    }
329
330    return $_[0]->get(ZMQ_EVENTS, 'int') & ZMQ_POLLOUT;
331}
332
333sub get {
334    my ($self, $opt, $opt_type) = @_;
335
336    if ($_[0]->socket_ptr == -1) {
337        carp "Operation on closed socket";
338        return;
339    }
340
341    my $optval;
342    my $optval_len;
343
344    for ($opt_type) {
345        when (/^(binary|string)$/) {
346            # ZMQ_IDENTITY uses binary type and can be at most 255 bytes long
347            #
348            # ZMQ_LAST_ENDPOINT uses string type and expects a buffer large
349            # enough to hold an endpoint string
350            #
351            # So for these cases 256 should be sufficient (including \0).
352            # Other binary/string opts are being added all the time, and
353            # hopefully this value scales, but we can always increase it if
354            # necessary
355            my $optval_ptr = malloc(256);
356            $optval_len    = 256;
357
358            $self->check_error(
359                'zmq_getsockopt',
360                zmq_getsockopt_binary(
361                    $self->socket_ptr,
362                    $opt,
363                    $optval_ptr,
364                    \$optval_len
365                )
366            );
367
368            if ($self->has_error) {
369                free($optval_ptr);
370                return;
371            }
372
373            if ($opt_type eq 'binary') {
374                $optval = buffer_to_scalar($optval_ptr, $optval_len);
375                free($optval_ptr);
376            }
377            else { # string
378                # FFI::Platypus already appends a null terminating byte for
379                # strings, so strip the one included by zeromq (otherwise test
380                # comparisons fail due to the extra NUL)
381                $optval = buffer_to_scalar($optval_ptr, $optval_len-1);
382                free($optval_ptr);
383            }
384        }
385
386        when ('int') {
387            $optval_len = $self->sockopt_sizes->{'int'};
388            $self->check_error(
389                'zmq_getsockopt',
390                zmq_getsockopt_int(
391                    $self->socket_ptr,
392                    $opt,
393                    \$optval,
394                    \$optval_len
395                )
396            );
397        }
398
399        when ('int64_t') {
400            $optval_len = $self->sockopt_sizes->{'sint64'};
401            $self->check_error(
402                'zmq_getsockopt',
403                zmq_getsockopt_int64(
404                    $self->socket_ptr,
405                    $opt,
406                    \$optval,
407                    \$optval_len
408                )
409            );
410        }
411
412        when ('uint64_t') {
413            $optval_len = $self->sockopt_sizes->{'uint64'};
414            $self->check_error(
415                'zmq_getsockopt',
416                zmq_getsockopt_uint64(
417                    $self->socket_ptr,
418                    $opt,
419                    \$optval,
420                    \$optval_len
421                )
422            );
423        }
424
425        default {
426            croak "unknown type $opt_type";
427        }
428    }
429
430    if ($optval ne '') {
431        return $optval;
432    }
433
434    return;
435}
436
437sub set {
438    my ($self, $opt, $opt_type, $optval) = @_;
439
440    if ($_[0]->socket_ptr == -1) {
441        carp "Operation on closed socket";
442        return;
443    }
444
445    for ($opt_type) {
446        when (/^(binary|string)$/) {
447            my ($optval_ptr, $optval_len) = scalar_to_buffer($optval);
448            $self->check_error(
449                'zmq_setsockopt',
450                zmq_setsockopt_binary(
451                    $self->socket_ptr,
452                    $opt,
453                    $optval_ptr,
454                    $optval_len
455                )
456            );
457        }
458
459        when ('int') {
460            $self->check_error(
461                'zmq_setsockopt',
462                zmq_setsockopt_int(
463                    $self->socket_ptr,
464                    $opt,
465                    \$optval,
466                    $self->sockopt_sizes->{'int'}
467                )
468            );
469        }
470
471        when ('int64_t') {
472            $self->check_error(
473                'zmq_setsockopt',
474                zmq_setsockopt_int64(
475                    $self->socket_ptr,
476                    $opt,
477                    \$optval,
478                    $self->sockopt_sizes->{'sint64'}
479                )
480            );
481        }
482
483        when ('uint64_t') {
484            $self->check_error(
485                'zmq_setsockopt',
486                zmq_setsockopt_uint64(
487                    $self->socket_ptr,
488                    $opt,
489                    \$optval,
490                    $self->sockopt_sizes->{'uint64'}
491                )
492            );
493        }
494
495        default {
496            croak "unknown type $opt_type";
497        }
498    }
499
500    return;
501}
502
503sub close {
504    my ($self) = @_;
505
506    if ($_[0]->socket_ptr == -1) {
507        carp "Operation on closed socket";
508        return;
509    }
510
511    # don't try to cleanup socket cloned from another thread
512    return unless $self->_tid == current_tid();
513
514    # don't try to cleanup socket copied from another process (fork)
515    return unless $self->_pid == $$;
516
517    $self->check_error(
518        'zmq_msg_close',
519        zmq_msg_close($self->_zmq_msg_t)
520    );
521
522    $self->check_error(
523        'zmq_close',
524        zmq_close($self->socket_ptr)
525    );
526
527    $self->socket_ptr(-1);
528}
529
530
531sub DEMOLISH {
532    my ($self) = @_;
533
534    # remove ourselves from the context object so that we dont leak
535    $self->context->_remove_socket($self) if (defined $self->context);
536
537    return if $self->socket_ptr == -1;
538
539    $self->close();
540}
541
5421;
543
544# vim:ft=perl
545
546__END__
547
548=pod
549
550=encoding UTF-8
551
552=head1 NAME
553
554ZMQ::FFI::ZMQ4_1::Socket
555
556=head1 VERSION
557
558version 1.17
559
560=head1 AUTHOR
561
562Dylan Cali <calid1984@gmail.com>
563
564=head1 COPYRIGHT AND LICENSE
565
566This software is copyright (c) 2019 by Dylan Cali.
567
568This is free software; you can redistribute it and/or modify it under
569the same terms as the Perl 5 programming language system itself.
570
571=cut
572