1############################################################################# 2## 3## iohub.gd GAP 4 package IO 4## Max Neunhoeffer 5## 6## Copyright (C) by Max Neunhoeffer 7## This file is free software, see license information at the end. 8## 9## This file contains functions for a generic client server framework 10## for GAP 11## 12## Main points: 13## 14## - handle multiple connections using IO multiplexing 15## - single threaded 16## - use pickling for data transfer 17## 18 19InstallMethod( IOHub, "constructor without arguments", [ ], 20 function( ) 21 local s; 22 s := rec( sock := fail, inqueue := [], outqueue := [], 23 tosend := [], torecv := [], inbuf := [], outbuf := [], 24 connections := [], isactive := true ) ; 25 Objectify(IOHubType, s); 26 return s; 27 end ); 28 29InstallMethod( AttachServingSocket, "for an address and a port", 30 [ IsIOHub, IsStringRep, IsPosInt ], 31 function( s, address, port ) 32 s!.sock := IO_socket(IO.PF_INET,IO.SOCK_STREAM,"tcp"); 33 if s!.sock = fail then return fail; fi; 34 IO_setsockopt(s!.sock,IO.SOL_SOCKET,IO.SO_REUSEADDR,"\001\001\001\001"); 35 if IO_bind(s!.sock,IO_MakeIPAddressPort(address,port)) = fail then 36 IO_close(s!.sock); 37 s!.sock := fail; 38 return fail; 39 fi; 40 if not(IO_listen(s!.sock,5)) then 41 IO_close(s!.sock); 42 s!.sock := fail; 43 return fail; 44 fi; 45 return s!.sock; 46 end ); 47 48InstallMethod( CloseConnection, "for an IO hub and a positive integer", 49 [ IsIOHub, IsPosInt ], 50 function( s, nr ) 51 # First remove all entries in the queue from or for this connection: 52 local i; 53 if not(IsBound(s!.connections[nr])) then 54 return fail; 55 fi; 56 if IsBound(s!.connections[nr][1]) then 57 i := 1; 58 while i <= Length(s!.inqueue) do 59 if s!.inqueue[i][1] = nr then 60 Remove(s!.inqueue[i]); 61 else 62 i := i + 1; 63 fi; 64 od; 65 IO_close(s!.connections[nr][1]); 66 fi; 67 if IsBound(s!.connections[nr][2]) then 68 i := 1; 69 while i <= Length(s!.outqueue) do 70 if s!.outqueue[i][1] = nr then 71 Remove(s!.outqueue[i]); 72 else 73 i := i + 1; 74 fi; 75 od; 76 IO_close(s!.connections[nr][2]); 77 fi; 78 Unbind(s!.connections[nr]); 79 Unbind(s!.tosend[nr]); 80 Unbind(s!.torecv[nr]); 81 Unbind(s!.inbuf[nr]); 82 Unbind(s!.outbuf[nr]); 83 Print("Connection #",nr," closed.\n"); 84 end ); 85 86InstallMethod( ShutdownServingSocket, "for an IO hub", 87 [IsIOHub], 88 function(s) 89 if s!.sock <> fail then 90 IO_close(s!.sock); 91 s!.sock := fail; 92 fi; 93 end ); 94 95InstallMethod( Shutdown, "for an IO hub", 96 [ IsIOHub ], 97 function( s ) 98 local i; 99 if not(s!.isactive) then return; fi; 100 for i in [1..Length(s!.connections)] do 101 CloseConnection(s,i); 102 od; 103 ShutdownServingSocket(s); 104 s!.isactive := false; 105 end ); 106 107InstallMethod( ViewObj, "for a tcp server", 108 [ IsIOHub ], 109 function( s ) 110 local nr; 111 if s!.isactive then 112 Print("<IO hub"); 113 if s!.sock <> fail then 114 Print(" with serving socket"); 115 fi; 116 nr := Number([1..Length(s!.connections)], 117 i->IsBound(s!.connections[i]) and 118 IsBound(s!.connections[i][1])); 119 if nr > 0 then 120 Print(", reading from ",nr," fds"); 121 fi; 122 nr := Number([1..Length(s!.connections)], 123 i->IsBound(s!.connections[i]) and 124 IsBound(s!.connections[i][2])); 125 if nr > 0 then 126 Print(", writing to ",nr," fds"); 127 fi; 128 Print(">"); 129 else 130 Print("<IO hub already shut down>"); 131 fi; 132 end ); 133 134InstallMethod( NewConnection, "for an IO hub and two integers", 135 [ IsIOHub, IsInt, IsInt ], 136 function( s, inp, out ) 137 local i,l; 138 if not(s!.isactive) then return fail; fi; 139 i := Length(s!.connections)+1; # do not reuse old connection numbers 140 l := []; 141 if inp > 0 then l[1] := inp; fi; 142 if out > 0 then l[2] := out; fi; 143 s!.connections[i] := l; 144 s!.tosend[i] := 0; 145 s!.torecv[i] := 0; 146 s!.inbuf[i] := EmptyString(8); 147 s!.outbuf[i] := ""; 148 return i; 149 end ); 150 151InstallMethod( NewTCPConnection, "for an IO hub, an address and a port", 152 [ IsIOHub, IsStringRep, IsPosInt ], 153 function( s, address, port ) 154 local t; 155 t := IO_socket(IO.PF_INET,IO.SOCK_STREAM,"tcp"); 156 if IO_connect(t,IO_MakeIPAddressPort(address,port)) = fail then 157 IO_close(t); 158 return fail; 159 fi; 160 return NewConnection(s,t,t); 161 end ); 162 163InstallMethod( AcceptNewConnection, "for an IO hub", 164 [ IsIOHub ], 165 function( s ) 166 local t,i; 167 if not(s!.isactive) or not(IsBound(s!.sock)) then 168 return fail; 169 fi; 170 t := IO_accept(s!.sock,IO_MakeIPAddressPort("0.0.0.0",0)); 171 i := NewConnection( s, t, t ); 172 Print("Got new connection #",i,"...\n"); 173 return i; 174 end ); 175 176InstallMethod( GetInput, "for an IO hub and an integer", 177 [ IsIOHub, IsInt ], 178 function( s, i ) 179 local p; 180 if not(s!.isactive) then return fail; fi; 181 if i = 0 then # get something from any connection 182 if Length(s!.inqueue) = 0 then 183 return false; 184 else 185 return Remove(s!.inqueue,1); 186 fi; 187 else 188 p := 1; 189 while p <= Length(s!.inqueue) and s!.inqueue[p][1] <> i do 190 p := p + 1; 191 od; 192 if p > Length(s!.inqueue) then 193 return false; 194 else 195 return Remove(s!.inqueue,p); 196 fi; 197 fi; 198 end ); 199 200InstallMethod( SubmitOutput, "for an IO hub, a positive integers and an obj", 201 [ IsIOHub, IsPosInt, IsStringRep ], 202 function( s, i, o ) 203 if not(IsBound(s!.connections[i]) and IsBound(s!.connections[i][2])) then 204 Error("This connection is closed or has no output"); 205 return fail; 206 fi; 207 Add(s!.outqueue,[i,o]); 208 return true; 209 end ); 210 211InstallMethod( OutputQueue, "for an IO hub", 212 [ IsIOHub ], 213 function( s ) return s!.outqueue; end ); 214 215InstallMethod( InputQueue, "for an IO hub", 216 [ IsIOHub ], 217 function( s ) return s!.inqueue; end ); 218 219InstallMethod( StoreLenIn8Bytes, "for a string and a len", 220 [IsStringRep, IsInt], 221 function( st, len ) 222 local c,i; 223 for i in [1..8] do 224 c := len mod 256; 225 st[i] := CHAR_INT(c); 226 len := (len - c) / 256; 227 od; 228 end ); 229 230InstallMethod( GetLenFrom8Bytes, "for a string", 231 [IsStringRep], 232 function( st ) 233 local len,i; 234 len := 0; 235 for i in [8,7..1] do 236 len := len * 256 + INT_CHAR(st[i]); 237 od; 238 return len; 239 end ); 240 241InstallMethod( DoIO, "for an IO hub", [ IsIOHub ], 242 function( s ) return DoIO(s,false); end ); 243 244InstallMethod( DoIO, "for an IO hub and a boolean", 245 [ IsIOHub, IsBool ], 246 function( s, block ) 247 # This uses select to see to all open connections including the 248 # original socket to perform all possible IO on them. New connections 249 # are created if needed and those to which network connectivity is 250 # lost are closed. 251 # Note that this does not automatically call the worker on the input 252 # queue. 253 # However, it does serve the output queue. 254 local activity,bytes,hadactivity,i,infds,inptab,j,len,nr,outfds,outtab,st; 255 256 if not(s!.isactive) then return fail; fi; 257 258 hadactivity := false; 259 repeat 260 activity := false; 261 # First we check whether some output from the queue has to be sent: 262 j := 1; 263 while j <= Length(s!.outqueue) do 264 i := s!.outqueue[j][1]; 265 if s!.tosend[i] = 0 then # idle 266 st := Concatenation("00000000",s!.outqueue[j][2]); 267 # the first 8 will be the length 268 len := Length(st); 269 StoreLenIn8Bytes(st,len-8); 270 s!.outbuf[i] := st; 271 s!.tosend[i] := len; 272 Remove(s!.outqueue,j); 273 else 274 j := j + 1; 275 fi; 276 od; 277 278 # Now do a select: 279 infds := EmptyPlist(Length(s!.connections)+1); 280 outfds := EmptyPlist(Length(s!.connections)); 281 inptab := EmptyPlist(Length(s!.connections)+1); 282 outtab := EmptyPlist(Length(s!.connections)); 283 for i in [1..Length(s!.connections)] do 284 if IsBound(s!.connections[i]) then 285 if IsBound(s!.connections[i][1]) then 286 Add(infds,s!.connections[i][1]); 287 Add(inptab,i); 288 fi; 289 if IsBound(s!.connections[i][2]) and s!.tosend[i] <> 0 then 290 Add(outfds,s!.connections[i][2]); 291 Add(outtab,i); 292 fi; 293 fi; 294 od; 295 if s!.sock <> fail then 296 Add(infds,s!.sock); 297 Add(inptab,0); 298 fi; 299 if block and not(hadactivity) then 300 nr := IO_select(infds,outfds,[],false,false); 301 else 302 nr := IO_select(infds,outfds,[],0,0); 303 fi; 304 if nr > 0 then 305 # Look for possible output first: 306 for j in [1..Length(outfds)] do 307 if outfds[j] <> fail then 308 activity := true; 309 i := outtab[j]; 310 bytes := IO_write(s!.connections[i][2],s!.outbuf[i], 311 Length(s!.outbuf[i])-s!.tosend[i], 312 s!.tosend[i]); 313 if bytes <= 0 then # an error 314 CloseConnection(s,i); 315 # maybe we want to have a callback here! 316 else 317 s!.tosend[i] := s!.tosend[i] - bytes; 318 if s!.tosend[i] = 0 then 319 Unbind(s!.outbuf[i]); 320 fi; 321 fi; 322 fi; 323 od; 324 # Now look for possible inputs next: 325 # We need to remember that some connections might already 326 # me closed by the output routine above! 327 for j in [1..Length(infds)] do 328 if infds[j] <> fail then 329 activity := true; 330 i := inptab[j]; 331 if i = 0 then 332 AcceptNewConnection(s); 333 else 334 if IsBound(s!.connections[i]) then 335 if s!.torecv[i] = 0 then # read length 336 bytes := IO_read(s!.connections[i][1], 337 s!.inbuf[i],Length(s!.inbuf[i]), 338 8-Length(s!.inbuf[i])); 339 if bytes <= 0 then # an error 340 CloseConnection(s,i); 341 # maybe we want to have a callback here! 342 continue; 343 fi; 344 if Length(s!.inbuf[i]) = 8 then 345 s!.torecv[i]:=GetLenFrom8Bytes(s!.inbuf[i]); 346 s!.inbuf[i] := EmptyString(s!.torecv[i]); 347 fi; 348 else # we are in the reading process 349 bytes := IO_read(s!.connections[i][1], 350 s!.inbuf[i],Length(s!.inbuf[i]), 351 s!.torecv[i]-Length(s!.inbuf[i])); 352 if bytes <= 0 then # an error 353 CloseConnection(s,i); 354 # maybe we want to have a callback here! 355 continue; 356 fi; 357 if Length(s!.inbuf[i]) = s!.torecv[i] then 358 Add(s!.inqueue,[i,s!.inbuf[i]]); 359 s!.torecv[i] := 0; 360 s!.inbuf[i] := EmptyString(8); 361 fi; 362 fi; 363 fi; 364 fi; 365 fi; 366 od; 367 fi; 368 if activity then hadactivity := true; fi; 369 until activity = false; 370 return hadactivity; 371 end ); 372 373 374## 375## This program is free software: you can redistribute it and/or modify 376## it under the terms of the GNU General Public License as published by 377## the Free Software Foundation, either version 3 of the License, or 378## (at your option) any later version. 379## 380## This program is distributed in the hope that it will be useful, 381## but WITHOUT ANY WARRANTY; without even the implied warranty of 382## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 383## GNU General Public License for more details. 384## 385## You should have received a copy of the GNU General Public License 386## along with this program. If not, see <http://www.gnu.org/licenses/>. 387## 388