1#############################################################################
2##
3##  iohub.gd               GAP 4 package IO
4##                                                           Max Neunhoeffer
5##
6##  Copyright (C) by Max Neunhoeffer
7##  This file is free software, see license information at the end.
8##
9##  This file contains functions for a generic client server framework
10##  for GAP
11##
12##  Main points:
13##
14##   - handle multiple connections using IO multiplexing
15##   - single threaded
16##   - use pickling for data transfer
17##
18
19InstallMethod( IOHub, "constructor without arguments", [ ],
20  function( )
21    local s;
22    s := rec( sock := fail, inqueue := [], outqueue := [],
23              tosend := [], torecv := [], inbuf := [], outbuf := [],
24              connections := [], isactive := true ) ;
25    Objectify(IOHubType, s);
26    return s;
27  end );
28
29InstallMethod( AttachServingSocket, "for an address and a port",
30  [ IsIOHub, IsStringRep, IsPosInt ],
31  function( s, address, port )
32    s!.sock := IO_socket(IO.PF_INET,IO.SOCK_STREAM,"tcp");
33    if s!.sock = fail then return fail; fi;
34    IO_setsockopt(s!.sock,IO.SOL_SOCKET,IO.SO_REUSEADDR,"\001\001\001\001");
35    if IO_bind(s!.sock,IO_MakeIPAddressPort(address,port)) = fail then
36        IO_close(s!.sock);
37        s!.sock := fail;
38        return fail;
39    fi;
40    if not(IO_listen(s!.sock,5)) then
41        IO_close(s!.sock);
42        s!.sock := fail;
43        return fail;
44    fi;
45    return s!.sock;
46  end );
47
48InstallMethod( CloseConnection, "for an IO hub and a positive integer",
49  [ IsIOHub, IsPosInt ],
50  function( s, nr )
51    # First remove all entries in the queue from or for this connection:
52    local i;
53    if not(IsBound(s!.connections[nr])) then
54        return fail;
55    fi;
56    if IsBound(s!.connections[nr][1]) then
57        i := 1;
58        while i <= Length(s!.inqueue) do
59            if s!.inqueue[i][1] = nr then
60                Remove(s!.inqueue[i]);
61            else
62                i := i + 1;
63            fi;
64        od;
65        IO_close(s!.connections[nr][1]);
66    fi;
67    if IsBound(s!.connections[nr][2]) then
68        i := 1;
69        while i <= Length(s!.outqueue) do
70            if s!.outqueue[i][1] = nr then
71                Remove(s!.outqueue[i]);
72            else
73                i := i + 1;
74            fi;
75        od;
76        IO_close(s!.connections[nr][2]);
77    fi;
78    Unbind(s!.connections[nr]);
79    Unbind(s!.tosend[nr]);
80    Unbind(s!.torecv[nr]);
81    Unbind(s!.inbuf[nr]);
82    Unbind(s!.outbuf[nr]);
83    Print("Connection #",nr," closed.\n");
84  end );
85
86InstallMethod( ShutdownServingSocket, "for an IO hub",
87  [IsIOHub],
88  function(s)
89    if s!.sock <> fail then
90        IO_close(s!.sock);
91        s!.sock := fail;
92    fi;
93  end );
94
95InstallMethod( Shutdown, "for an IO hub",
96  [ IsIOHub ],
97  function( s )
98    local i;
99    if not(s!.isactive) then return; fi;
100    for i in [1..Length(s!.connections)] do
101        CloseConnection(s,i);
102    od;
103    ShutdownServingSocket(s);
104    s!.isactive := false;
105  end );
106
107InstallMethod( ViewObj, "for a tcp server",
108  [ IsIOHub ],
109  function( s )
110    local nr;
111    if s!.isactive then
112        Print("<IO hub");
113        if s!.sock <> fail then
114            Print(" with serving socket");
115        fi;
116        nr := Number([1..Length(s!.connections)],
117                     i->IsBound(s!.connections[i]) and
118                        IsBound(s!.connections[i][1]));
119        if nr > 0 then
120            Print(", reading from ",nr," fds");
121        fi;
122        nr := Number([1..Length(s!.connections)],
123                     i->IsBound(s!.connections[i]) and
124                        IsBound(s!.connections[i][2]));
125        if nr > 0 then
126            Print(", writing to ",nr," fds");
127        fi;
128        Print(">");
129    else
130        Print("<IO hub already shut down>");
131    fi;
132  end );
133
134InstallMethod( NewConnection, "for an IO hub and two integers",
135  [ IsIOHub, IsInt, IsInt ],
136  function( s, inp, out )
137    local i,l;
138    if not(s!.isactive) then return fail; fi;
139    i := Length(s!.connections)+1;   # do not reuse old connection numbers
140    l := [];
141    if inp > 0 then l[1] := inp; fi;
142    if out > 0 then l[2] := out; fi;
143    s!.connections[i] := l;
144    s!.tosend[i] := 0;
145    s!.torecv[i] := 0;
146    s!.inbuf[i] := EmptyString(8);
147    s!.outbuf[i] := "";
148    return i;
149  end );
150
151InstallMethod( NewTCPConnection, "for an IO hub, an address and a port",
152  [ IsIOHub, IsStringRep, IsPosInt ],
153  function( s, address, port )
154    local t;
155    t := IO_socket(IO.PF_INET,IO.SOCK_STREAM,"tcp");
156    if IO_connect(t,IO_MakeIPAddressPort(address,port)) = fail then
157        IO_close(t);
158        return fail;
159    fi;
160    return NewConnection(s,t,t);
161  end );
162
163InstallMethod( AcceptNewConnection, "for an IO hub",
164  [ IsIOHub ],
165  function( s )
166    local t,i;
167    if not(s!.isactive) or not(IsBound(s!.sock)) then
168        return fail;
169    fi;
170    t := IO_accept(s!.sock,IO_MakeIPAddressPort("0.0.0.0",0));
171    i := NewConnection( s, t, t );
172    Print("Got new connection #",i,"...\n");
173    return i;
174  end );
175
176InstallMethod( GetInput, "for an IO hub and an integer",
177  [ IsIOHub, IsInt ],
178  function( s, i )
179    local p;
180    if not(s!.isactive) then return fail; fi;
181    if i = 0 then  # get something from any connection
182        if Length(s!.inqueue) = 0 then
183            return false;
184        else
185            return Remove(s!.inqueue,1);
186        fi;
187    else
188        p := 1;
189        while p <= Length(s!.inqueue) and s!.inqueue[p][1] <> i do
190            p := p + 1;
191        od;
192        if p > Length(s!.inqueue) then
193            return false;
194        else
195            return Remove(s!.inqueue,p);
196        fi;
197    fi;
198  end );
199
200InstallMethod( SubmitOutput, "for an IO hub, a positive integers and an obj",
201  [ IsIOHub, IsPosInt, IsStringRep ],
202  function( s, i, o )
203    if not(IsBound(s!.connections[i]) and IsBound(s!.connections[i][2])) then
204        Error("This connection is closed or has no output");
205        return fail;
206    fi;
207    Add(s!.outqueue,[i,o]);
208    return true;
209  end );
210
211InstallMethod( OutputQueue, "for an IO hub",
212  [ IsIOHub ],
213  function( s ) return s!.outqueue; end );
214
215InstallMethod( InputQueue, "for an IO hub",
216  [ IsIOHub ],
217  function( s ) return s!.inqueue; end );
218
219InstallMethod( StoreLenIn8Bytes, "for a string and a len",
220  [IsStringRep, IsInt],
221  function( st, len )
222    local c,i;
223    for i in [1..8] do
224      c := len mod 256;
225      st[i] := CHAR_INT(c);
226      len := (len - c) / 256;
227    od;
228  end );
229
230InstallMethod( GetLenFrom8Bytes, "for a string",
231  [IsStringRep],
232  function( st )
233    local len,i;
234    len := 0;
235    for i in [8,7..1] do
236        len := len * 256 + INT_CHAR(st[i]);
237    od;
238    return len;
239  end );
240
241InstallMethod( DoIO, "for an IO hub", [ IsIOHub ],
242  function( s ) return DoIO(s,false); end );
243
244InstallMethod( DoIO, "for an IO hub and a boolean",
245  [ IsIOHub, IsBool ],
246  function( s, block )
247    # This uses select to see to all open connections including the
248    # original socket to perform all possible IO on them. New connections
249    # are created if needed and those to which network connectivity is
250    # lost are closed.
251    # Note that this does not automatically call the worker on the input
252    # queue.
253    # However, it does serve the output queue.
254    local activity,bytes,hadactivity,i,infds,inptab,j,len,nr,outfds,outtab,st;
255
256    if not(s!.isactive) then return fail; fi;
257
258    hadactivity := false;
259    repeat
260        activity := false;
261        # First we check whether some output from the queue has to be sent:
262        j := 1;
263        while j <= Length(s!.outqueue) do
264            i := s!.outqueue[j][1];
265            if s!.tosend[i] = 0 then   # idle
266                st := Concatenation("00000000",s!.outqueue[j][2]);
267                # the first 8 will be the length
268                len := Length(st);
269                StoreLenIn8Bytes(st,len-8);
270                s!.outbuf[i] := st;
271                s!.tosend[i] := len;
272                Remove(s!.outqueue,j);
273            else
274                j := j + 1;
275            fi;
276        od;
277
278        # Now do a select:
279        infds := EmptyPlist(Length(s!.connections)+1);
280        outfds := EmptyPlist(Length(s!.connections));
281        inptab := EmptyPlist(Length(s!.connections)+1);
282        outtab := EmptyPlist(Length(s!.connections));
283        for i in [1..Length(s!.connections)] do
284            if IsBound(s!.connections[i]) then
285                if IsBound(s!.connections[i][1]) then
286                    Add(infds,s!.connections[i][1]);
287                    Add(inptab,i);
288                fi;
289                if IsBound(s!.connections[i][2]) and s!.tosend[i] <> 0 then
290                    Add(outfds,s!.connections[i][2]);
291                    Add(outtab,i);
292                fi;
293            fi;
294        od;
295        if s!.sock <> fail then
296            Add(infds,s!.sock);
297            Add(inptab,0);
298        fi;
299        if block and not(hadactivity) then
300            nr := IO_select(infds,outfds,[],false,false);
301        else
302            nr := IO_select(infds,outfds,[],0,0);
303        fi;
304        if nr > 0 then
305            # Look for possible output first:
306            for j in [1..Length(outfds)] do
307                if outfds[j] <> fail then
308                    activity := true;
309                    i := outtab[j];
310                    bytes := IO_write(s!.connections[i][2],s!.outbuf[i],
311                                      Length(s!.outbuf[i])-s!.tosend[i],
312                                      s!.tosend[i]);
313                    if bytes <= 0 then   # an error
314                        CloseConnection(s,i);
315                        # maybe we want to have a callback here!
316                    else
317                        s!.tosend[i] := s!.tosend[i] - bytes;
318                        if s!.tosend[i] = 0 then
319                            Unbind(s!.outbuf[i]);
320                        fi;
321                    fi;
322                fi;
323            od;
324            # Now look for possible inputs next:
325            # We need to remember that some connections might already
326            # me closed by the output routine above!
327            for j in [1..Length(infds)] do
328                if infds[j] <> fail then
329                    activity := true;
330                    i := inptab[j];
331                    if i = 0 then
332                        AcceptNewConnection(s);
333                    else
334                        if IsBound(s!.connections[i]) then
335                            if s!.torecv[i] = 0 then   # read length
336                                bytes := IO_read(s!.connections[i][1],
337                                   s!.inbuf[i],Length(s!.inbuf[i]),
338                                   8-Length(s!.inbuf[i]));
339                                if bytes <= 0 then   # an error
340                                    CloseConnection(s,i);
341                                    # maybe we want to have a callback here!
342                                    continue;
343                                fi;
344                                if Length(s!.inbuf[i]) = 8 then
345                                    s!.torecv[i]:=GetLenFrom8Bytes(s!.inbuf[i]);
346                                    s!.inbuf[i] := EmptyString(s!.torecv[i]);
347                                fi;
348                            else   # we are in the reading process
349                                bytes := IO_read(s!.connections[i][1],
350                                   s!.inbuf[i],Length(s!.inbuf[i]),
351                                   s!.torecv[i]-Length(s!.inbuf[i]));
352                                if bytes <= 0 then   # an error
353                                    CloseConnection(s,i);
354                                    # maybe we want to have a callback here!
355                                    continue;
356                                fi;
357                                if Length(s!.inbuf[i]) = s!.torecv[i] then
358                                    Add(s!.inqueue,[i,s!.inbuf[i]]);
359                                    s!.torecv[i] := 0;
360                                    s!.inbuf[i] := EmptyString(8);
361                                fi;
362                            fi;
363                        fi;
364                    fi;
365                fi;
366            od;
367        fi;
368        if activity then hadactivity := true; fi;
369    until activity = false;
370    return hadactivity;
371  end );
372
373
374##
375##  This program is free software: you can redistribute it and/or modify
376##  it under the terms of the GNU General Public License as published by
377##  the Free Software Foundation, either version 3 of the License, or
378##  (at your option) any later version.
379##
380##  This program is distributed in the hope that it will be useful,
381##  but WITHOUT ANY WARRANTY; without even the implied warranty of
382##  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
383##  GNU General Public License for more details.
384##
385##  You should have received a copy of the GNU General Public License
386##  along with this program.  If not, see <http://www.gnu.org/licenses/>.
387##
388