1 /*
2 
3     This file is part of the Maude 2 interpreter.
4 
5     Copyright 1997-2003 SRI International, Menlo Park, CA 94025, USA.
6 
7     This program is free software; you can redistribute it and/or modify
8     it under the terms of the GNU General Public License as published by
9     the Free Software Foundation; either version 2 of the License, or
10     (at your option) any later version.
11 
12     This program is distributed in the hope that it will be useful,
13     but WITHOUT ANY WARRANTY; without even the implied warranty of
14     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15     GNU General Public License for more details.
16 
17     You should have received a copy of the GNU General Public License
18     along with this program; if not, write to the Free Software
19     Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
20 
21 */
22 
23 //
24 //	Main socket manipulation code.
25 //
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <netinet/in.h>
31 #include <netdb.h>
32 #include <arpa/inet.h>
33 #include <errno.h>
34 
35 bool
getPort(DagNode * portArg,int & port)36 SocketManagerSymbol::getPort(DagNode* portArg, int& port)
37 {
38   return succSymbol->getSignedInt(portArg, port) && port <= 65535;  // HACK
39 }
40 
41 bool
getActiveSocket(DagNode * socketArg,int & socketId,ActiveSocket * & asp)42 SocketManagerSymbol::getActiveSocket(DagNode* socketArg, int& socketId, ActiveSocket*& asp)
43 {
44   if (socketArg->symbol() == socketOidSymbol)
45     {
46       DagNode* idArg = safeCast(FreeDagNode*, socketArg)->getArgument(0);
47       if (succSymbol->getSignedInt(idArg, socketId))
48 	{
49 	  SocketMap::iterator i = activeSockets.find(socketId);
50 	  if (activeSockets.find(socketId) != activeSockets.end())
51 	    {
52 	      asp = &(i->second);
53 	      return true;
54 	    }
55 	}
56     }
57   return false;
58 }
59 
60 bool
getText(DagNode * textArg,Rope & text)61 SocketManagerSymbol::getText(DagNode* textArg, Rope& text)
62 {
63   if (textArg->symbol() == stringSymbol)
64     {
65       text = safeCast(StringDagNode*, textArg)->getValue();
66       return true;
67     }
68   return false;
69 }
70 
71 bool
setNonblockingFlag(int fd,FreeDagNode * message,ObjectSystemRewritingContext & context)72 SocketManagerSymbol::setNonblockingFlag(int fd, FreeDagNode* message, ObjectSystemRewritingContext& context)
73 {
74   //
75   //	Set nonblocking flag for a nascent socket; since it is not yet an external object we
76   //	can just close it and generate an error reply if things don't work out.
77   //
78   int flags = fcntl(fd, F_GETFL);
79   if (flags == -1)
80     {
81       const char* errText = strerror(errno);
82       DebugAdvisory("unexpected fcntl() GETFL: " << errText);
83       close(fd);
84       errorReply(errText, message, context);
85       return false;
86     }
87   if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
88     {
89       const char* errText = strerror(errno);
90       DebugAdvisory("unexpected fcntl() GETFL: " << errText);
91       close(fd);
92       errorReply(errText, message, context);
93       return false;
94     }
95   return true;
96 }
97 
98 bool
createClientTcpSocket(FreeDagNode * message,ObjectSystemRewritingContext & context)99 SocketManagerSymbol::createClientTcpSocket(FreeDagNode* message, ObjectSystemRewritingContext& context)
100 {
101   Assert(message->getArgument(0)->symbol() == this, "misdirected message");
102   int port;
103   DagNode* addressArg = message->getArgument(2);
104 
105   if (getPort(message->getArgument(3), port) && addressArg->symbol() == stringSymbol)
106     {
107       //
108       //	We accept the message.
109       //
110 
111       //
112       //	Look up the address.
113       //
114       const Rope& address = safeCast(StringDagNode*, addressArg)->getValue();
115       char* addressStr = address.makeZeroTerminatedString();
116       hostent* record = gethostbyname(addressStr);  // HACK - might block
117       delete [] addressStr;
118       if (record == 0)
119 	{
120 	  DebugAdvisory("unexpected gethostbyname(() error: " << strerror(errno));
121 	  errorReply("bad address", message, context);
122 	  return true;
123 	}
124       //
125       //	Create a socket.
126       //
127       int fd = socket(PF_INET, SOCK_STREAM, 0);
128       if (fd == -1)
129 	{
130 	  const char* errText = strerror(errno);
131 	  DebugAdvisory("unexpected socket() error: " << errText);
132 	  errorReply(errText, message, context);
133 	  return true;
134 	}
135       //
136       //	Make it non-blocking.
137       //
138       if (!setNonblockingFlag(fd, message, context))
139 	return true;
140       //
141       //	Try to connect to host.
142       //
143       sockaddr_in sockName;
144       sockName.sin_family = AF_INET;
145       sockName.sin_port = htons(port);
146       sockName.sin_addr = *(reinterpret_cast<in_addr*>(record->h_addr_list[0]));  // HACK
147       if (connect(fd, reinterpret_cast<sockaddr*>(&sockName), sizeof(sockName)) == 0)
148 	{
149 	  createdSocketReply(fd, message, context);  // instant success
150 	  activeSockets[fd].state = NOMINAL;  // this creates the ActiveSocket object
151 	}
152       else if (errno == EINPROGRESS)
153 	{
154 	  //
155 	  //	Incomplete transaction on an asynchronous socket, so save details
156 	  //	so we know what to do when transaction completes.
157 	  //
158 	  ActiveSocket& as = activeSockets[fd];  // this creates the ActiveSocket object
159 	  as.state = WAITING_TO_CONNECT;
160 	  as.lastMessage.setNode(message);
161 	  as.originalContext = &context;
162 	  //
163 	  //	Completion (could be success or failure) is indicated by the operation system
164 	  //	making the socket writable.
165 	  //
166 	  wantTo(WRITE, fd);
167 	}
168       else
169 	{
170 	  //
171 	  //	Connect failed, so report error and pretend that socket never existed.
172 	  //
173 	  DebugAdvisory("unexpected connect() error: " << strerror(errno));
174 	  close(fd);
175 	  errorReply("failed to connect", message, context);
176 	}
177       return true;
178     }
179   IssueAdvisory("socket manager declined malformed message " << QUOTE(message) << '.');
180   return false;
181 }
182 
183 bool
createServerTcpSocket(FreeDagNode * message,ObjectSystemRewritingContext & context)184 SocketManagerSymbol::createServerTcpSocket(FreeDagNode* message, ObjectSystemRewritingContext& context)
185 {
186   Assert(message->getArgument(0)->symbol() == this, "misdirected message");
187   int port;
188   int backlog;
189 
190   if (getPort(message->getArgument(2), port) &&
191       succSymbol->getSignedInt(message->getArgument(3), backlog) &&
192       backlog > 0)
193     {
194       //
195       //	Create a socket.
196       //
197       int fd = socket(PF_INET, SOCK_STREAM, 0);
198       if (fd == -1)
199 	{
200 	  const char* errText = strerror(errno);
201 	  DebugAdvisory("unexpected socket() error: " << errText);
202 	  errorReply(errText, message, context);
203 	  return true;
204 	}
205       //
206       //	Make it non-blocking.
207       //
208       if (!setNonblockingFlag(fd, message, context))
209 	return true;
210       {
211 	//
212 	//	Set SO_REUSEADDR so port can be immediately reused following the close()
213 	//	of this socket.
214 	//
215 	int value = 1;
216 	if (setsockopt(fd, SOL_SOCKET,  SO_REUSEADDR,  &value, sizeof(value)) == -1)
217 	  {
218 	    const char* errText = strerror(errno);
219 	    DebugAdvisory("setsockopt(SO_REUSEADDR) failed: " << errText);
220 	    errorReply(errText, message, context);
221 	    close(fd);
222 	    return true;
223 	  }
224       }
225       {
226 	//
227 	//	Bind it to the local port.
228 	//
229 	sockaddr_in sockName;
230 	sockName.sin_family = AF_INET;
231 	sockName.sin_port = htons(port);
232 	sockName.sin_addr.s_addr = htonl(INADDR_ANY);  // HACK - what is the portable way to set this?
233 	if (::bind(fd, reinterpret_cast<sockaddr*>(&sockName), sizeof(sockName)) == -1)
234 	  {
235 	    const char* errText = strerror(errno);
236 	    DebugAdvisory("unexpected bind() error with fd " << fd << ": " << errText);
237 	    errorReply(errText, message, context);
238 	    close(fd);
239 	    return true;
240 	  }
241       }
242       //
243       //	Start listening for connections.
244       //
245       if (listen(fd, backlog) == -1)
246 	{
247 	  const char* errText = strerror(errno);
248 	  DebugAdvisory("unexpected listen() error: " << errText);
249 	  errorReply(errText, message, context);
250 	  close(fd);
251 	  return true;
252 	}
253       //
254       //	Return a message now that we have a bound and listening server socket.
255       //
256       createdSocketReply(fd, message, context);
257       activeSockets[fd].state = LISTENING;  // this creates the ActiveSocket object
258      return true;
259     }
260   IssueAdvisory("socket manager declined malformed message " << QUOTE(message) << '.');
261   return false;
262 }
263 
264 bool
acceptClient(FreeDagNode * message,ObjectSystemRewritingContext & context)265 SocketManagerSymbol::acceptClient(FreeDagNode* message, ObjectSystemRewritingContext& context)
266 {
267   int socketId;
268   ActiveSocket* asp;
269   DagNode* socketName = message->getArgument(0);
270   if (getActiveSocket(socketName, socketId, asp))
271     {
272       ActiveSocket& as = *asp;
273       if (as.state == LISTENING)
274 	{
275 	  sockaddr_in sockName;
276 	  socklen_t addrLen = sizeof(sockName);
277 	  int r;
278 	  do
279 	    r = accept(socketId, reinterpret_cast<sockaddr*>(&sockName), &addrLen);
280 	  while (r == -1 && errno == EINTR);
281 	  if (r >= 0)
282 	    {
283 	      if (setNonblockingFlag(r, message, context))
284 		{
285 		  acceptedClientReply(inet_ntoa(sockName.sin_addr), r, message, context);
286 		  activeSockets[r].state = NOMINAL;  // this creates the new ActiveSocket object
287 		}
288 	    }
289 	  else if (errno == EAGAIN)
290 	    {
291 	      as.state = WAITING_TO_ACCEPT;
292 	      as.lastMessage.setNode(message);
293 	      as.originalContext = &context;
294 	      wantTo(READ, socketId);
295 	    }
296 	  else
297 	    {
298 	      //
299 	      //	What should we do with a socket that we failed to accept on?
300 	      //
301 	      const char* errText = strerror(errno);
302 	      DebugAdvisory("unexpected accept() error: " << errText);
303 	      errorReply(errText, message, context);
304 	    }
305 	  return true;
306 	}
307       IssueAdvisory(socketName << " declined message " << QUOTE(message) << '.');
308       return false;
309     }
310   IssueAdvisory("no socket to receive message " << QUOTE(message) << '.');
311   return false;
312 }
313 
314 bool
send(FreeDagNode * message,ObjectSystemRewritingContext & context)315 SocketManagerSymbol::send(FreeDagNode* message, ObjectSystemRewritingContext& context)
316 {
317   int socketId;
318   ActiveSocket* asp;
319   Rope text;
320   DagNode* socketName = message->getArgument(0);
321   if (getActiveSocket(socketName, socketId, asp) &&
322       getText(message->getArgument(2), text) &&
323       !(text.empty()))
324     {
325       //ActiveSocket& as = activeSockets[socketId];
326       ActiveSocket& as = *asp;
327       if ((as.state & ~WAITING_TO_READ) == 0)  // check that all the state bits other than WAITING_TO_READ are clear
328 	{
329 	  //as.text = text;
330 	  as.textArray = text.makeZeroTerminatedString();
331 	  //as.unsent = as.text.c_str();
332 	  as.unsent = as.textArray;
333 	  as.nrUnsent = text.length();  // how to deal with empty message?
334 	  //
335 	  //	Write some characters to the socket; we might get interrupted and have to restart.
336 	  //
337 	  ssize_t n;
338 	  do
339 	    n = write(socketId, as.unsent, as.nrUnsent);
340 	  while (n == -1 && errno == EINTR);
341 
342 	  if (n == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))  // treat blocking situation as zero chars send
343 	    n = 0;
344 	  if (n >= 0)
345 	    {
346 	      as.nrUnsent -= n;
347 	      if (as.nrUnsent == 0)  // done
348 		{
349 		  sentMsgReply(message, context);
350 		  // clear  as.text
351 		  delete [] as.textArray;
352 		  as.textArray = 0;
353 		}
354 	      else  // at least some characters pending
355 		{
356 		  as.state |= WAITING_TO_WRITE;
357 		  as.lastMessage.setNode(message);
358 		  as.originalContext = &context;
359 		  as.unsent += n;
360 		  wantTo(WRITE, socketId);
361 		}
362 	    }
363 	  else
364 	    {
365 	      const char* errText = strerror(errno);
366 	      DebugAdvisory("unexpected write() error : " << errText);
367 	      closedSocketReply(socketId, errText, message, context);
368 	    }
369 	  return true;
370 	}
371       IssueAdvisory(socketName << " declined message " << QUOTE(message) << '.');
372       return false;
373     }
374   IssueAdvisory("no socket to receive message " << QUOTE(message) << '.');
375   return false;
376 }
377 
378 bool
receive(FreeDagNode * message,ObjectSystemRewritingContext & context)379 SocketManagerSymbol::receive(FreeDagNode* message, ObjectSystemRewritingContext& context)
380 {
381   int socketId;
382   ActiveSocket* asp;
383   DagNode* socketName = message->getArgument(0);
384   if (getActiveSocket(socketName, socketId, asp))
385     {
386       //ActiveSocket& as = activeSockets[socketId];
387       ActiveSocket& as = *asp;
388       if ((as.state & ~WAITING_TO_WRITE) == 0)  // check that all the state bits other than WAITING_TO_WRITE are clear
389 	{
390 	  char buffer[READ_BUFFER_SIZE];
391 	  ssize_t n;
392 	  do
393 	    n = read(socketId, buffer, READ_BUFFER_SIZE);
394 	  while (n == -1 && errno == EINTR);
395 
396 	  if (n > 0)
397 	    receivedMsgReply(buffer, n, message, context);
398 	  else
399 	    {
400 	      if (n == -1)
401 		{
402 		  if (errno == EAGAIN)
403 		    {
404 		      as.state |= WAITING_TO_READ;
405 		      as.lastMessage.setNode(message);
406 		      as.originalContext = &context;
407 		      wantTo(READ, socketId);
408 		    }
409 		  else
410 		    {
411 		      const char* errText = strerror(errno);
412 		      DebugAdvisory("unexpected read() error: " << errText);
413 		      closedSocketReply(socketId, errText, message, context);
414 		    }
415 		}
416 	      else
417 		{
418 		  DebugAdvisory("read 0 bytes");
419 		  closedSocketReply(socketId, "", message, context);
420 		}
421 	    }
422 	  return true;
423 	}
424       else
425 	DebugAdvisory("as.state = " << as.state);
426       IssueAdvisory(socketName << " declined message " << QUOTE(message) << '.');
427       return false;
428     }
429   IssueAdvisory("no socket to receive message " << QUOTE(message) << '.');
430   return false;
431 }
432 
433 bool
closeSocket(FreeDagNode * message,ObjectSystemRewritingContext & context)434 SocketManagerSymbol::closeSocket(FreeDagNode* message, ObjectSystemRewritingContext& context)
435 {
436   int socketId;
437   ActiveSocket* asp;
438   DagNode* socketName = message->getArgument(0);
439   if (getActiveSocket(socketName, socketId, asp))
440     {
441       closedSocketReply(socketId, "", message, context);
442       return true;
443     }
444   IssueAdvisory("no socket to receive message " << QUOTE(message) << '.');
445   return false;
446 }
447 
448 void
cleanUp(DagNode * objectId)449 SocketManagerSymbol::cleanUp(DagNode* objectId)
450 {
451   int socketId;
452   ActiveSocket* asp;
453   if (getActiveSocket(objectId, socketId, asp))
454     {
455       DebugAdvisory("cleaning up " << objectId);
456       close(socketId);
457       activeSockets.erase(socketId);
458       PseudoThread::clearFlags(socketId);  // to avoid eventLoop() testing an invalid fd
459     }
460   else
461     CantHappen("no socket for " << objectId);
462 }
463