1use strict; 2use warnings; 3 4use Test::More; 5use Test::Warnings; 6use Test::Exception; 7 8use ZMQ::FFI qw(ZMQ_STREAMER ZMQ_PUSH ZMQ_PULL); 9use ZMQ::FFI::Util qw(zmq_version); 10 11use Time::HiRes q(usleep); 12use POSIX ":sys_wait_h"; 13 14my $server_address = "ipc:///tmp/test-zmq-ffi-$$-front"; 15my $worker_address = "ipc:///tmp/test-zmq-ffi-$$-back"; 16 17my $device; 18 19sub mkdevice { 20 my $ctx = ZMQ::FFI->new(); 21 22 my $front = $ctx->socket(ZMQ_PULL); 23 $front->bind($server_address); 24 25 my $back = $ctx->socket(ZMQ_PUSH); 26 $back->bind($worker_address); 27 28 $ctx->device(ZMQ_STREAMER, $front, $back); 29 warn "device exited: $!"; 30 31 exit 0; 32} 33 34my ($major) = zmq_version(); 35if ($major > 2) { 36 throws_ok { mkdevice() } 37 qr/zmq_device not available in zmq >= 3\.x/, 38 'zmq_device version error for zmq >= 3.x'; 39} 40else { 41 # Set up the streamer device in its own process 42 $device = fork; 43 die "fork failed: $!" unless defined $device; 44 45 if ( $device == 0 ) { 46 mkdevice(); 47 } 48} 49 50subtest 'device', sub { 51 my $ctx = ZMQ::FFI->new(); 52 53 if ($major > 2) { 54 plan skip_all => 'zmq_device not available in zmq >= 3.x'; 55 } 56 57 my $server = $ctx->socket(ZMQ_PUSH); 58 $server->connect($server_address); 59 60 my $worker = $ctx->socket(ZMQ_PULL); 61 $worker->connect($worker_address); 62 63 my $message = 'ohhai'; 64 $server->send($message); 65 66 until ($worker->has_pollin) { 67 68 # sleep for a 100ms to compensate for slow subscriber problem 69 usleep 100_000; 70 } 71 72 my $payload = $worker->recv; 73 is $payload, $message, "Message received"; 74 75 kill TERM => $device; 76 waitpid($device,0); 77}; 78 79 80done_testing; 81 82