1use strict;
2use warnings;
3
4use Test::More;
5use Test::Warnings;
6use Test::Exception;
7
8use ZMQ::FFI qw(ZMQ_STREAMER ZMQ_PUSH ZMQ_PULL);
9use ZMQ::FFI::Util qw(zmq_version);
10
11use Time::HiRes q(usleep);
12use POSIX ":sys_wait_h";
13
14my $server_address = "ipc:///tmp/test-zmq-ffi-$$-front";
15my $worker_address = "ipc:///tmp/test-zmq-ffi-$$-back";
16
17my $device;
18
19sub mkdevice {
20    my $ctx = ZMQ::FFI->new();
21
22    my $front = $ctx->socket(ZMQ_PULL);
23    $front->bind($server_address);
24
25    my $back  = $ctx->socket(ZMQ_PUSH);
26    $back->bind($worker_address);
27
28    $ctx->device(ZMQ_STREAMER, $front, $back);
29    warn "device exited: $!";
30
31    exit 0;
32}
33
34my ($major) = zmq_version();
35if ($major > 2) {
36    throws_ok { mkdevice() }
37        qr/zmq_device not available in zmq >= 3\.x/,
38        'zmq_device version error for zmq >= 3.x';
39}
40else {
41    # Set up the streamer device in its own process
42    $device = fork;
43    die "fork failed: $!" unless defined $device;
44
45    if ( $device == 0 ) {
46        mkdevice();
47    }
48}
49
50subtest 'device', sub {
51    my $ctx = ZMQ::FFI->new();
52
53    if ($major > 2) {
54        plan skip_all => 'zmq_device not available in zmq >= 3.x';
55    }
56
57    my $server = $ctx->socket(ZMQ_PUSH);
58    $server->connect($server_address);
59
60    my $worker = $ctx->socket(ZMQ_PULL);
61    $worker->connect($worker_address);
62
63    my $message = 'ohhai';
64    $server->send($message);
65
66    until ($worker->has_pollin) {
67
68        # sleep for a 100ms to compensate for slow subscriber problem
69        usleep 100_000;
70    }
71
72    my $payload = $worker->recv;
73    is $payload, $message, "Message received";
74
75    kill TERM => $device;
76    waitpid($device,0);
77};
78
79
80done_testing;
81
82