1 /*
2
3 This file is part of the Maude 2 interpreter.
4
5 Copyright 1997-2003 SRI International, Menlo Park, CA 94025, USA.
6
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 2 of the License, or
10 (at your option) any later version.
11
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
16
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, write to the Free Software
19 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
20
21 */
22
23 //
24 // Main socket manipulation code.
25 //
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <netinet/in.h>
31 #include <netdb.h>
32 #include <arpa/inet.h>
33 #include <errno.h>
34
35 bool
getPort(DagNode * portArg,int & port)36 SocketManagerSymbol::getPort(DagNode* portArg, int& port)
37 {
38 return succSymbol->getSignedInt(portArg, port) && port <= 65535; // HACK
39 }
40
41 bool
getActiveSocket(DagNode * socketArg,int & socketId,ActiveSocket * & asp)42 SocketManagerSymbol::getActiveSocket(DagNode* socketArg, int& socketId, ActiveSocket*& asp)
43 {
44 if (socketArg->symbol() == socketOidSymbol)
45 {
46 DagNode* idArg = safeCast(FreeDagNode*, socketArg)->getArgument(0);
47 if (succSymbol->getSignedInt(idArg, socketId))
48 {
49 SocketMap::iterator i = activeSockets.find(socketId);
50 if (activeSockets.find(socketId) != activeSockets.end())
51 {
52 asp = &(i->second);
53 return true;
54 }
55 }
56 }
57 return false;
58 }
59
60 bool
getText(DagNode * textArg,Rope & text)61 SocketManagerSymbol::getText(DagNode* textArg, Rope& text)
62 {
63 if (textArg->symbol() == stringSymbol)
64 {
65 text = safeCast(StringDagNode*, textArg)->getValue();
66 return true;
67 }
68 return false;
69 }
70
71 bool
setNonblockingFlag(int fd,FreeDagNode * message,ObjectSystemRewritingContext & context)72 SocketManagerSymbol::setNonblockingFlag(int fd, FreeDagNode* message, ObjectSystemRewritingContext& context)
73 {
74 //
75 // Set nonblocking flag for a nascent socket; since it is not yet an external object we
76 // can just close it and generate an error reply if things don't work out.
77 //
78 int flags = fcntl(fd, F_GETFL);
79 if (flags == -1)
80 {
81 const char* errText = strerror(errno);
82 DebugAdvisory("unexpected fcntl() GETFL: " << errText);
83 close(fd);
84 errorReply(errText, message, context);
85 return false;
86 }
87 if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
88 {
89 const char* errText = strerror(errno);
90 DebugAdvisory("unexpected fcntl() GETFL: " << errText);
91 close(fd);
92 errorReply(errText, message, context);
93 return false;
94 }
95 return true;
96 }
97
98 bool
createClientTcpSocket(FreeDagNode * message,ObjectSystemRewritingContext & context)99 SocketManagerSymbol::createClientTcpSocket(FreeDagNode* message, ObjectSystemRewritingContext& context)
100 {
101 Assert(message->getArgument(0)->symbol() == this, "misdirected message");
102 int port;
103 DagNode* addressArg = message->getArgument(2);
104
105 if (getPort(message->getArgument(3), port) && addressArg->symbol() == stringSymbol)
106 {
107 //
108 // We accept the message.
109 //
110
111 //
112 // Look up the address.
113 //
114 const Rope& address = safeCast(StringDagNode*, addressArg)->getValue();
115 char* addressStr = address.makeZeroTerminatedString();
116 hostent* record = gethostbyname(addressStr); // HACK - might block
117 delete [] addressStr;
118 if (record == 0)
119 {
120 DebugAdvisory("unexpected gethostbyname(() error: " << strerror(errno));
121 errorReply("bad address", message, context);
122 return true;
123 }
124 //
125 // Create a socket.
126 //
127 int fd = socket(PF_INET, SOCK_STREAM, 0);
128 if (fd == -1)
129 {
130 const char* errText = strerror(errno);
131 DebugAdvisory("unexpected socket() error: " << errText);
132 errorReply(errText, message, context);
133 return true;
134 }
135 //
136 // Make it non-blocking.
137 //
138 if (!setNonblockingFlag(fd, message, context))
139 return true;
140 //
141 // Try to connect to host.
142 //
143 sockaddr_in sockName;
144 sockName.sin_family = AF_INET;
145 sockName.sin_port = htons(port);
146 sockName.sin_addr = *(reinterpret_cast<in_addr*>(record->h_addr_list[0])); // HACK
147 if (connect(fd, reinterpret_cast<sockaddr*>(&sockName), sizeof(sockName)) == 0)
148 {
149 createdSocketReply(fd, message, context); // instant success
150 activeSockets[fd].state = NOMINAL; // this creates the ActiveSocket object
151 }
152 else if (errno == EINPROGRESS)
153 {
154 //
155 // Incomplete transaction on an asynchronous socket, so save details
156 // so we know what to do when transaction completes.
157 //
158 ActiveSocket& as = activeSockets[fd]; // this creates the ActiveSocket object
159 as.state = WAITING_TO_CONNECT;
160 as.lastMessage.setNode(message);
161 as.originalContext = &context;
162 //
163 // Completion (could be success or failure) is indicated by the operation system
164 // making the socket writable.
165 //
166 wantTo(WRITE, fd);
167 }
168 else
169 {
170 //
171 // Connect failed, so report error and pretend that socket never existed.
172 //
173 DebugAdvisory("unexpected connect() error: " << strerror(errno));
174 close(fd);
175 errorReply("failed to connect", message, context);
176 }
177 return true;
178 }
179 IssueAdvisory("socket manager declined malformed message " << QUOTE(message) << '.');
180 return false;
181 }
182
183 bool
createServerTcpSocket(FreeDagNode * message,ObjectSystemRewritingContext & context)184 SocketManagerSymbol::createServerTcpSocket(FreeDagNode* message, ObjectSystemRewritingContext& context)
185 {
186 Assert(message->getArgument(0)->symbol() == this, "misdirected message");
187 int port;
188 int backlog;
189
190 if (getPort(message->getArgument(2), port) &&
191 succSymbol->getSignedInt(message->getArgument(3), backlog) &&
192 backlog > 0)
193 {
194 //
195 // Create a socket.
196 //
197 int fd = socket(PF_INET, SOCK_STREAM, 0);
198 if (fd == -1)
199 {
200 const char* errText = strerror(errno);
201 DebugAdvisory("unexpected socket() error: " << errText);
202 errorReply(errText, message, context);
203 return true;
204 }
205 //
206 // Make it non-blocking.
207 //
208 if (!setNonblockingFlag(fd, message, context))
209 return true;
210 {
211 //
212 // Set SO_REUSEADDR so port can be immediately reused following the close()
213 // of this socket.
214 //
215 int value = 1;
216 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)) == -1)
217 {
218 const char* errText = strerror(errno);
219 DebugAdvisory("setsockopt(SO_REUSEADDR) failed: " << errText);
220 errorReply(errText, message, context);
221 close(fd);
222 return true;
223 }
224 }
225 {
226 //
227 // Bind it to the local port.
228 //
229 sockaddr_in sockName;
230 sockName.sin_family = AF_INET;
231 sockName.sin_port = htons(port);
232 sockName.sin_addr.s_addr = htonl(INADDR_ANY); // HACK - what is the portable way to set this?
233 if (::bind(fd, reinterpret_cast<sockaddr*>(&sockName), sizeof(sockName)) == -1)
234 {
235 const char* errText = strerror(errno);
236 DebugAdvisory("unexpected bind() error with fd " << fd << ": " << errText);
237 errorReply(errText, message, context);
238 close(fd);
239 return true;
240 }
241 }
242 //
243 // Start listening for connections.
244 //
245 if (listen(fd, backlog) == -1)
246 {
247 const char* errText = strerror(errno);
248 DebugAdvisory("unexpected listen() error: " << errText);
249 errorReply(errText, message, context);
250 close(fd);
251 return true;
252 }
253 //
254 // Return a message now that we have a bound and listening server socket.
255 //
256 createdSocketReply(fd, message, context);
257 activeSockets[fd].state = LISTENING; // this creates the ActiveSocket object
258 return true;
259 }
260 IssueAdvisory("socket manager declined malformed message " << QUOTE(message) << '.');
261 return false;
262 }
263
264 bool
acceptClient(FreeDagNode * message,ObjectSystemRewritingContext & context)265 SocketManagerSymbol::acceptClient(FreeDagNode* message, ObjectSystemRewritingContext& context)
266 {
267 int socketId;
268 ActiveSocket* asp;
269 DagNode* socketName = message->getArgument(0);
270 if (getActiveSocket(socketName, socketId, asp))
271 {
272 ActiveSocket& as = *asp;
273 if (as.state == LISTENING)
274 {
275 sockaddr_in sockName;
276 socklen_t addrLen = sizeof(sockName);
277 int r;
278 do
279 r = accept(socketId, reinterpret_cast<sockaddr*>(&sockName), &addrLen);
280 while (r == -1 && errno == EINTR);
281 if (r >= 0)
282 {
283 if (setNonblockingFlag(r, message, context))
284 {
285 acceptedClientReply(inet_ntoa(sockName.sin_addr), r, message, context);
286 activeSockets[r].state = NOMINAL; // this creates the new ActiveSocket object
287 }
288 }
289 else if (errno == EAGAIN)
290 {
291 as.state = WAITING_TO_ACCEPT;
292 as.lastMessage.setNode(message);
293 as.originalContext = &context;
294 wantTo(READ, socketId);
295 }
296 else
297 {
298 //
299 // What should we do with a socket that we failed to accept on?
300 //
301 const char* errText = strerror(errno);
302 DebugAdvisory("unexpected accept() error: " << errText);
303 errorReply(errText, message, context);
304 }
305 return true;
306 }
307 IssueAdvisory(socketName << " declined message " << QUOTE(message) << '.');
308 return false;
309 }
310 IssueAdvisory("no socket to receive message " << QUOTE(message) << '.');
311 return false;
312 }
313
314 bool
send(FreeDagNode * message,ObjectSystemRewritingContext & context)315 SocketManagerSymbol::send(FreeDagNode* message, ObjectSystemRewritingContext& context)
316 {
317 int socketId;
318 ActiveSocket* asp;
319 Rope text;
320 DagNode* socketName = message->getArgument(0);
321 if (getActiveSocket(socketName, socketId, asp) &&
322 getText(message->getArgument(2), text) &&
323 !(text.empty()))
324 {
325 //ActiveSocket& as = activeSockets[socketId];
326 ActiveSocket& as = *asp;
327 if ((as.state & ~WAITING_TO_READ) == 0) // check that all the state bits other than WAITING_TO_READ are clear
328 {
329 //as.text = text;
330 as.textArray = text.makeZeroTerminatedString();
331 //as.unsent = as.text.c_str();
332 as.unsent = as.textArray;
333 as.nrUnsent = text.length(); // how to deal with empty message?
334 //
335 // Write some characters to the socket; we might get interrupted and have to restart.
336 //
337 ssize_t n;
338 do
339 n = write(socketId, as.unsent, as.nrUnsent);
340 while (n == -1 && errno == EINTR);
341
342 if (n == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) // treat blocking situation as zero chars send
343 n = 0;
344 if (n >= 0)
345 {
346 as.nrUnsent -= n;
347 if (as.nrUnsent == 0) // done
348 {
349 sentMsgReply(message, context);
350 // clear as.text
351 delete [] as.textArray;
352 as.textArray = 0;
353 }
354 else // at least some characters pending
355 {
356 as.state |= WAITING_TO_WRITE;
357 as.lastMessage.setNode(message);
358 as.originalContext = &context;
359 as.unsent += n;
360 wantTo(WRITE, socketId);
361 }
362 }
363 else
364 {
365 const char* errText = strerror(errno);
366 DebugAdvisory("unexpected write() error : " << errText);
367 closedSocketReply(socketId, errText, message, context);
368 }
369 return true;
370 }
371 IssueAdvisory(socketName << " declined message " << QUOTE(message) << '.');
372 return false;
373 }
374 IssueAdvisory("no socket to receive message " << QUOTE(message) << '.');
375 return false;
376 }
377
378 bool
receive(FreeDagNode * message,ObjectSystemRewritingContext & context)379 SocketManagerSymbol::receive(FreeDagNode* message, ObjectSystemRewritingContext& context)
380 {
381 int socketId;
382 ActiveSocket* asp;
383 DagNode* socketName = message->getArgument(0);
384 if (getActiveSocket(socketName, socketId, asp))
385 {
386 //ActiveSocket& as = activeSockets[socketId];
387 ActiveSocket& as = *asp;
388 if ((as.state & ~WAITING_TO_WRITE) == 0) // check that all the state bits other than WAITING_TO_WRITE are clear
389 {
390 char buffer[READ_BUFFER_SIZE];
391 ssize_t n;
392 do
393 n = read(socketId, buffer, READ_BUFFER_SIZE);
394 while (n == -1 && errno == EINTR);
395
396 if (n > 0)
397 receivedMsgReply(buffer, n, message, context);
398 else
399 {
400 if (n == -1)
401 {
402 if (errno == EAGAIN)
403 {
404 as.state |= WAITING_TO_READ;
405 as.lastMessage.setNode(message);
406 as.originalContext = &context;
407 wantTo(READ, socketId);
408 }
409 else
410 {
411 const char* errText = strerror(errno);
412 DebugAdvisory("unexpected read() error: " << errText);
413 closedSocketReply(socketId, errText, message, context);
414 }
415 }
416 else
417 {
418 DebugAdvisory("read 0 bytes");
419 closedSocketReply(socketId, "", message, context);
420 }
421 }
422 return true;
423 }
424 else
425 DebugAdvisory("as.state = " << as.state);
426 IssueAdvisory(socketName << " declined message " << QUOTE(message) << '.');
427 return false;
428 }
429 IssueAdvisory("no socket to receive message " << QUOTE(message) << '.');
430 return false;
431 }
432
433 bool
closeSocket(FreeDagNode * message,ObjectSystemRewritingContext & context)434 SocketManagerSymbol::closeSocket(FreeDagNode* message, ObjectSystemRewritingContext& context)
435 {
436 int socketId;
437 ActiveSocket* asp;
438 DagNode* socketName = message->getArgument(0);
439 if (getActiveSocket(socketName, socketId, asp))
440 {
441 closedSocketReply(socketId, "", message, context);
442 return true;
443 }
444 IssueAdvisory("no socket to receive message " << QUOTE(message) << '.');
445 return false;
446 }
447
448 void
cleanUp(DagNode * objectId)449 SocketManagerSymbol::cleanUp(DagNode* objectId)
450 {
451 int socketId;
452 ActiveSocket* asp;
453 if (getActiveSocket(objectId, socketId, asp))
454 {
455 DebugAdvisory("cleaning up " << objectId);
456 close(socketId);
457 activeSockets.erase(socketId);
458 PseudoThread::clearFlags(socketId); // to avoid eventLoop() testing an invalid fd
459 }
460 else
461 CantHappen("no socket for " << objectId);
462 }
463