1#  Copyright (c) 1997-2021
2#  Ewgenij Gawrilow, Michael Joswig, and the polymake team
3#  Technische Universität Berlin, Germany
4#  https://polymake.org
5#
6#  This program is free software; you can redistribute it and/or modify it
7#  under the terms of the GNU General Public License as published by the
8#  Free Software Foundation; either version 2, or (at your option) any
9#  later version: http://www.gnu.org/licenses/gpl.txt.
10#
11#  This program is distributed in the hope that it will be useful,
12#  but WITHOUT ANY WARRANTY; without even the implied warranty of
13#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14#  GNU General Public License for more details.
15#-------------------------------------------------------------------------------
16
17use strict;
18use namespaces;
19use warnings qw(FATAL void syntax misc);
20
21package Polymake::Selector;
22use Fcntl;
23
24declare %active;
25declare @channels;
26declare $rmask="";
27declare $wmask="";
28
29package Polymake::Selector::Member;
30use Polymake::Struct (
31   [ new => '$' ],
32   [ '$handle' => '#1' ],
33   [ '$rfd | wfd' => 'fileno(#1)' ],
34);
35
36sub new {
37   my $self=&_new;
38   if (keys(%active)==1) {
39      $self->not_alone;
40      (each %active)->not_alone;
41   }
42   $active{$self}=0;
43   weak($channels[$self->rfd]=$self);
44   vec($rmask, $self->rfd, 1)=1;
45   $self;
46}
47
48sub FILENO { (shift)->rfd }
49
50sub bye {
51   my ($self, $destroying)=@_;
52   delete $active{$self};
53   undef $self->rfd;
54   if (keys(%active)==1) {
55      (each %active)->alone;
56   }
57   bless $self, "Polymake::Selector::Closed" unless $destroying;
58   1
59}
60
61sub forget {
62   my ($self)=@_;
63   undef $channels[$self->rfd];
64   vec($rmask, $self->rfd, 1)=0;
65   vec($wmask, $self->wfd, 1)=0;
66}
67
68sub register_write_channel {
69   my ($self)=@_;
70   weak($channels[$self->wfd]=$self);
71}
72
73sub unregister_write_channel {
74   my ($self)=@_;
75   undef $channels[$self->wfd];
76   vec($wmask, $self->wfd, 1)=0;
77}
78
79sub CLOSE {
80   my ($self)=@_;
81   &forget;
82   POSIX::close($self->rfd);
83   &bye;
84}
85
86sub DESTROY {
87   my $self=shift;
88   if (defined($self->rfd) && vec($rmask,$self->rfd,1)) {
89      $self->CLOSE(1);
90   }
91}
92
93sub alone {}
94sub not_alone {}
95
96#####################################################################################
97package Polymake::Selector;
98
99sub try_read {
100   my ($member, $ignore_error) = @_;
101   my ($rready, $wready, $rc);
102   while ($rc = select($rready = $rmask, $wready = $wmask, undef, undef)) {
103      if ($rc < 0) {
104         next if $! == $ignore_error;
105         return;
106      }
107      if (vec($rready, $member->rfd, 1)) {
108         return 1;
109      }
110      foreach my $wfd (ones($wready)) {
111         $channels[$wfd]->out_avail;
112      }
113      foreach my $rfd (ones($rready)) {
114         $channels[$rfd]->in_avail;
115      }
116   }
117   0
118}
119
120sub try_write {
121   my ($member)=@_;
122   my ($rready, $wready);
123   while (select $rready=$rmask, $wready=$wmask, undef, 0) {
124      if (vec($wready, $member->wfd, 1)) {
125         return $member->out_avail;
126      }
127      foreach my $wfd (ones($wready)) {
128         $channels[$wfd]->out_avail;
129      }
130      foreach my $rfd (ones($rready)) {
131         $channels[$rfd]->in_avail;
132      }
133   }
134   0
135}
136
137#####################################################################################
138package Polymake::Selector::Closed;
139use Polymake::Struct [ '@ISA' => 'Member' ];
140
141sub READ { 0 }
142sub WRITE { 0 }
143sub CLOSE { 1 }
144sub DESTROY { }
145
146#####################################################################################
147package Polymake::Pipe;
148use Fcntl;
149use POSIX qw(:errno_h);
150
151use Polymake::Struct (
152   [ '@ISA' => 'Selector::Member' ],
153   [ '$flags' => 'undef' ],
154   '$rbuffer',
155   '$wbuffer',
156);
157
158sub construct {
159   my $self=&Member::new;
160   tie *{$self->handle}, $self;
161   $self->flags=fcntl($self->handle, F_GETFL, 0);
162   $self;
163}
164
165# returns the tied handle, not the Pipe object!
166sub new {
167   &construct; shift;
168}
169
170sub TIEHANDLE { shift }
171
172sub READLINE {
173   my $self=shift;
174   my ($l, $gotten)=(0);
175   if (defined $/) {
176      do {
177         if ((my $endl=index($self->rbuffer, $/, $l)) >= 0) {
178            return substr($self->rbuffer, 0, $endl+1, "");
179         }
180         $l=length($self->rbuffer);
181      } while ($gotten=$self->do_read($self->rbuffer,1024,$l)) > 0;
182   } else {
183      my $whole=$self->rbuffer;
184      do {
185         $gotten=$self->do_read($whole,2<<16,length($whole));
186      } while ($gotten>0);
187      return $whole if defined($gotten);
188   }
189   die "error reading from Pipe: $!\n" if !defined $gotten;
190   undef;
191}
192
193sub READ {
194   my $self=shift;
195   if (length($self->rbuffer)) {
196      my (undef, $len, $offset)=@_;
197      assign_min($len, length($self->rbuffer));
198      if ($offset) {
199         substr($_[0],$offset)=substr($self->rbuffer,0,$len,"");
200      } else {
201         $_[0]=substr($self->rbuffer,0,$len,"");
202      }
203      return $len;
204   } else {
205      $self->do_read(@_);
206   }
207}
208
209sub do_read {
210   my ($self, undef, $len, $offset)=@_;
211   my $gotten;
212   if ($offset) {
213      my $app;
214      $gotten=POSIX::read($self->rfd, $app, $len)
215      and substr($_[1],$offset)=$app;
216   } else {
217      $gotten=POSIX::read($self->rfd, $_[1], $len);
218   }
219   $self->CLOSE if $gotten==0;
220   $gotten;
221}
222
223sub read_chunk {
224   my ($self, $req)=@_;
225   my $data="";
226   my $l;
227   while (($l=length($data)) < $req) {
228      defined(READ($self,$data,$req-$l,$l)) or last;
229   }
230   $data;
231}
232
233sub PRINT {
234   my $self=shift;
235   $self->WRITE(join($, , @_).$\);
236}
237
238sub PRINTF {
239   my $self=shift;
240   $self->WRITE(sprintf(@_));
241}
242
243sub WRITE {
244   my $self=shift;
245   my ($str, $len);
246   if (@_==1) {
247      $str=$_[0];  $len=length($str);
248   } else {
249      $str=substr($_[0],$_[2],$len=$_[1]);
250   }
251   $self->set_non_blocking_write;
252   my $written=POSIX::write($self->wfd, $str, $len);
253   if (!defined($written)) {
254      if ($! == POSIX::EAGAIN) {
255         $written=0;
256      } else {
257         $self->CLOSE;
258         return undef;
259      }
260   }
261   $self->set_blocking_write;
262   if ($written<$len) {
263      $self->wbuffer=substr($str, $written, $len-$written);
264      vec($wmask, $self->wfd, 1)=1;
265      $self->not_alone if keys(%active)==1;
266   }
267   $len;
268}
269
270sub EOF {
271   my ($self)=@_;
272   eof($self->handle);
273}
274
275sub TELL {
276   my ($self)=@_;
277   eof($self->handle) ? -1 : 0
278}
279
280# if pos==0, consumes the next portion of data from the input pipe
281# if pos==-1, consumes all data until the pipe closes
282# otherwise, return an error code
283sub SEEK {
284   my ($self, $pos)=@_;
285   if ($pos == 0) {
286      try_read($self, POSIX::EAGAIN) && $self->in_avail >= 0
287   } elsif ($pos == -1) {
288      while (try_read($self, POSIX::EAGAIN) && $self->in_avail > 0) { }
289      eof($self->handle);
290   }
291}
292
293sub CLOSE {
294   my ($self)=@_;
295   &forget;
296   untie *{$self->handle};
297   close($self->handle);
298   &bye;
299}
300
301sub not_alone {
302   my ($self)=@_;
303   bless $self, "Polymake::Pipe::Collaborative";
304}
305
306sub set_non_blocking_write {
307   my ($self)=@_;
308   fcntl($self->handle, F_SETFL, $self->flags|O_NONBLOCK);
309}
310
311sub set_blocking_write {
312   my ($self)=@_;
313   fcntl($self->handle, F_SETFL, $self->flags);
314}
315
316#####################################################################################
317package Polymake::Pipe::Collaborative;
318use Polymake::Struct [ '@ISA' => 'Pipe' ];
319
320sub out_avail {
321   my ($self)=@_;
322   my $written=POSIX::write($self->wfd, $self->wbuffer, length($self->wbuffer));
323   if (!defined($written)) {
324      if ($! == POSIX::EAGAIN) {
325         $written=0;
326      } else {
327         $self->CLOSE;
328         return undef;  # dead pipe
329      }
330   }
331   if ($written == length($self->wbuffer)) {
332      vec($wmask, $self->wfd, 1)=0;
333      $self->wbuffer="";
334      $self->alone if keys %active==1;
335      1;                # stalled output completely drained
336   } else {
337      substr($self->wbuffer, 0, $written)="";
338      0                 # still output pending
339   }
340}
341
342sub in_avail {
343   my ($self)=@_;
344   Pipe::do_read($self, $self->rbuffer, 1024, length($self->rbuffer));
345}
346
347sub do_read {
348   my ($self) = @_;
349   try_read($self, POSIX::EAGAIN) && &Pipe::do_read;
350}
351
352sub WRITE {
353   my ($self)=@_;
354   if (length($self->wbuffer) and !(my $rc=try_write($self))) {
355      defined($rc) or return;
356      if (@_==1) {
357         $self->wbuffer.=$_[0];
358         length($_[0]);
359      } else {
360         $self->wbuffer.=substr($_[0],$_[2],$_[1]);
361         $_[1];
362      }
363   } else {
364      &Pipe::WRITE;
365   }
366}
367
368sub alone {
369   my $self=shift;
370   if (!length($self->wbuffer)) {
371      bless $self, "Polymake::Pipe";
372   }
373}
374
375sub not_alone {}
376
377#####################################################################################
378
3791
380
381# Local Variables:
382# cperl-indent-level:3
383# indent-tabs-mode:nil
384# End:
385