1 /*===========================================================================*
2 psocket.c
3 ==============================================================================
4
5 low level communication facilities for Ppmtompeg parallel operation
6
7 By Bryan Henderson 2004.10.13. Contributed to the public domain by
8 its author.
9
10 ============================================================================*/
11
12 /* _ALL_SOURCE is needed on AIX to make the C library include the
13 socket services (e.g. define struct sockaddr)
14
15 Note that AIX standards.h actually sets feature declaration macros such
16 as _XOPEN_SOURCE, unless they are already set.
17 */
18 #define _ALL_SOURCE
19 #define __EXTENSIONS__
20 /* __EXTENSIONS__ is for a broken Sun C library (uname SunOS kosh 5.8
21 generic_108528-16 sun4u sparc). When you define _XOPEN_SOURCE,
22 it's vnode.h and resource.h fail to define some data types that they
23 need (e.g. timestruct_t). But with __EXTENSIONS__, they declare the
24 needed types anyway. Our #include <sys/socket.h> causes the broken
25 header files to get included.
26 */
27
28 /* On AIX, pm_config.h includes standards.h, which expects to be included
29 after feature declaration macros such as _XOPEN_SOURCE. So we include
30 pm_config.h as late as possible.
31 */
32
33 #include "pm_config.h" /* For POSIX_IS_IMPLIED */
34
35 #ifdef POSIX_IS_IMPLIED
36 /* The OpenBSD C library, at least, is broken in that when _XOPEN_SOURCE
37 is defined, its sys/socket.h refers to types "u_char", etc. but does
38 not define them. But it is also one of the C libraries where
39 POSIX is implied so that we don't need to define _XOPEN_SOURCE in order
40 to get the POSIX routines such as pclose() defined. So we circumvent
41 the problem by undefining _XOPEN_SOURCE:
42 */
43 #undef _XOPEN_SOURCE
44 #endif
45
46 #include <stdarg.h>
47 #include <netinet/in.h>
48 #include <unistd.h>
49 #include <netdb.h>
50 #include <stdio.h>
51 #include <errno.h>
52 #include <assert.h>
53 #include <sys/types.h>
54 #include <sys/socket.h>
55
56 #include "pm.h"
57 #include "pm_c_util.h"
58 #include "nstring.h"
59
60 #include "gethostname.h"
61
62 #include "psocket.h"
63
64
65 /* We use type socklenx_t where we should use socklen_t from the C
66 library, but older systems don't have socklen_t. And there doesn't
67 appear to be any way for the preprocessor to know whether it exists
68 or not. On older systems with no socklen_t, a message length is a
69 signed integer, but on modern systems, socklen_t is an unsigned
70 integer. Until we have some kind of build-time check for the existence
71 of socklen_t, we just use this socklenx_t, which is an unsigned
72 integer, and accept compiler warnings on older system.
73 -Bryan 2001.04.22.
74 */
75 typedef unsigned int socklenx_t;
76
77 #ifndef SOMAXCONN
78 #define SOMAXCONN 5
79 #endif
80
81
82
83 static void PM_GNU_PRINTF_ATTR(1,2)
errorExit(const char format[],...)84 errorExit(const char format[], ...) {
85
86 const char * const hostname = GetHostName();
87
88 va_list args;
89
90 va_start(args, format);
91
92 fprintf(stderr, "%s: FATAL ERROR. ", hostname);
93 pm_strfree(hostname);
94 vfprintf(stderr, format, args);
95 fputc('\n', stderr);
96
97 exit(1);
98
99 va_end(args);
100 }
101
102
103
104 static void
unmarshallInt(unsigned char const buffer[],int * const valueP)105 unmarshallInt(unsigned char const buffer[],
106 int * const valueP) {
107 /*----------------------------------------------------------------------------
108 Interpret a number which is formatted for one of our network packets.
109
110 To wit, 32 bit big-endian pure binary.
111 -----------------------------------------------------------------------------*/
112 union {
113 uint32_t value;
114 unsigned char bytes[4];
115 } converter;
116
117 memcpy(&converter.bytes, buffer, 4);
118
119 /* Note that contrary to what the C data types suggest, ntohl() is
120 a 32 bit converter, even if a C "long" is bigger than that.
121 */
122 *valueP = ntohl(converter.value);
123 }
124
125
126
127 static void
safeRead(int const fd,unsigned char * const buf,unsigned int const nbyte)128 safeRead(int const fd,
129 unsigned char * const buf,
130 unsigned int const nbyte) {
131 /*----------------------------------------------------------------------------
132 Safely read from file 'fd'. Keep reading until we get
133 'nbyte' bytes.
134 -----------------------------------------------------------------------------*/
135 unsigned int numRead;
136
137 numRead = 0; /* initial value */
138
139 while (numRead < nbyte) {
140 int const result = read(fd, &buf[numRead], nbyte-numRead);
141
142 if (result == -1)
143 errorExit("read (of %u bytes (total %u) ) returned "
144 "errno %d (%s)",
145 nbyte-numRead, nbyte, errno, strerror(errno));
146 else
147 numRead += result;
148 }
149 }
150
151
152
153 void
ReadBytes(int const fd,unsigned char * const buf,unsigned int const nbyte)154 ReadBytes(int const fd,
155 unsigned char * const buf,
156 unsigned int const nbyte) {
157
158 safeRead(fd, buf, nbyte);
159 }
160
161
162
163 void
ReadInt(int const socketFd,int * const valueP)164 ReadInt(int const socketFd,
165 int * const valueP) {
166
167 unsigned char buffer[4];
168
169 safeRead(socketFd, buffer, sizeof(buffer));
170
171 unmarshallInt(buffer, valueP);
172 }
173
174
175
176 static void
marshallInt(int const value,unsigned char (* const bufferP)[])177 marshallInt(int const value,
178 unsigned char (* const bufferP)[]) {
179 /*----------------------------------------------------------------------------
180 Put the number 'value' into the buffer at *bufferP in the form required
181 for one of our network packets.
182
183 To wit, 32 bit big-endian pure binary.
184 -----------------------------------------------------------------------------*/
185 union {
186 uint32_t value;
187 unsigned char bytes[4];
188 } converter;
189
190 unsigned char testbuffer[4];
191
192 /* Note that contrary to what the C data types suggest, htonl() is
193 a 32 bit converter, even if a C "long" is bigger than that.
194 */
195 converter.value = htonl(value);
196
197 (*bufferP)[0] = 7;
198 memcpy(testbuffer, &converter.bytes, 4);
199 memcpy(*bufferP, &converter.bytes, 4);
200 }
201
202
203
204 static void
safeWrite(int const fd,unsigned char * const buf,unsigned int const nbyte)205 safeWrite(int const fd,
206 unsigned char * const buf,
207 unsigned int const nbyte) {
208 /*----------------------------------------------------------------------------
209 Safely write to file 'fd'. Keep writing until we write 'nbyte'
210 bytes.
211 -----------------------------------------------------------------------------*/
212 unsigned int numWritten;
213
214 numWritten = 0; /* initial value */
215
216 while (numWritten < nbyte) {
217 int const result = write(fd, &buf[numWritten], nbyte-numWritten);
218
219 if (result == -1)
220 errorExit("write (of %u bytes (total %u) ) returned "
221 "errno %d (%s)",
222 nbyte-numWritten, nbyte, errno, strerror(errno));
223 numWritten += result;
224 }
225 }
226
227
228
229 void
WriteBytes(int const fd,unsigned char * const buf,unsigned int const nbyte)230 WriteBytes(int const fd,
231 unsigned char * const buf,
232 unsigned int const nbyte) {
233
234 safeWrite(fd, buf, nbyte);
235 }
236
237
238
239 void
WriteInt(int const socketFd,int const value)240 WriteInt(int const socketFd,
241 int const value) {
242
243 unsigned char buffer[4];
244
245 marshallInt(value, &buffer);
246
247 safeWrite(socketFd, buffer, sizeof(buffer));
248 }
249
250
251
252 void
ConnectToSocket(const char * const machineName,int const portNum,struct hostent ** const hostEnt,int * const socketFdP,const char ** const errorP)253 ConnectToSocket(const char * const machineName,
254 int const portNum,
255 struct hostent ** const hostEnt,
256 int * const socketFdP,
257 const char ** const errorP) {
258 /*----------------------------------------------------------------------------
259 Create a socket and connect it to the specified TCP endpoint.
260
261 That endpoint is fundamentally defined by 'machineName' and
262 'portNum', but *hostEnt is the address of a host entry that caches
263 the results of the host name lookup. If *hostEnt is non-null, we
264 use it. If *hostEnt is NULL, we look up the information and update
265 **hostEnt.
266 -----------------------------------------------------------------------------*/
267 int rc;
268
269 *errorP = NULL; /* initial value */
270
271 if ((*hostEnt) == NULL) {
272 (*hostEnt) = gethostbyname(machineName);
273 if ((*hostEnt) == NULL)
274 pm_asprintf(errorP, "Couldn't get host by name (%s)", machineName);
275 }
276 if (!*errorP) {
277 rc = socket(AF_INET, SOCK_STREAM, 0);
278 if (rc < 0)
279 pm_asprintf(errorP, "socket() failed with errno %d (%s)",
280 errno, strerror(errno));
281 else {
282 int const socketFd = rc;
283
284 int rc;
285 unsigned short tempShort;
286 struct sockaddr_in nameEntry;
287
288 nameEntry.sin_family = AF_INET;
289 memset((void *) nameEntry.sin_zero, 0, 8);
290 memcpy((void *) &(nameEntry.sin_addr.s_addr),
291 (void *) (*hostEnt)->h_addr_list[0],
292 (size_t) (*hostEnt)->h_length);
293 tempShort = portNum;
294 nameEntry.sin_port = htons(tempShort);
295
296 rc = connect(socketFd, (struct sockaddr *) &nameEntry,
297 sizeof(struct sockaddr));
298
299 if (rc != 0)
300 pm_asprintf(errorP,
301 "connect() to host '%s', port %d failed with "
302 "errno %d (%s)",
303 machineName, portNum, errno, strerror(errno));
304 else {
305 *errorP = NULL;
306 *socketFdP = socketFd;
307 }
308 if (*errorP)
309 close(socketFd);
310 }
311 }
312 }
313
314
315
316 static bool
portInUseErrno(int const testErrno)317 portInUseErrno(int const testErrno) {
318 /*----------------------------------------------------------------------------
319 Return TRUE iff 'testErrno' is what a bind() would return if one requested
320 a port number that is unavailable (but other port numbers might be).
321 -----------------------------------------------------------------------------*/
322 bool retval;
323
324 switch (testErrno) {
325 case EINVAL:
326 case EADDRINUSE:
327 case EADDRNOTAVAIL:
328 retval = TRUE;
329 break;
330 default:
331 retval = FALSE;
332 }
333 return retval;
334 }
335
336
337
338 static void
bindToUnusedPort(int const socketFd,unsigned short * const portNumP,const char ** const errorP)339 bindToUnusedPort(int const socketFd,
340 unsigned short * const portNumP,
341 const char ** const errorP) {
342
343 bool foundPort;
344 unsigned short trialPortNum;
345
346 *errorP = NULL; /* initial value */
347
348 for (foundPort = FALSE, trialPortNum = 2048;
349 !foundPort && trialPortNum < 16384 && !*errorP;
350 ++trialPortNum) {
351
352 struct sockaddr_in nameEntry;
353 int rc;
354
355 memset((char *) &nameEntry, 0, sizeof(nameEntry));
356 nameEntry.sin_family = AF_INET;
357 nameEntry.sin_port = htons(trialPortNum);
358
359 rc = bind(socketFd, (struct sockaddr *) &nameEntry,
360 sizeof(struct sockaddr));
361
362 if (rc == 0) {
363 foundPort = TRUE;
364 *portNumP = trialPortNum;
365 } else if (!portInUseErrno(errno))
366 pm_asprintf(errorP, "bind() of TCP port number %hu failed "
367 "with errno %d (%s)",
368 trialPortNum, errno, strerror(errno));
369 }
370
371 if (!*errorP && !foundPort)
372 pm_asprintf(errorP, "Unable to find a free port. Every TCP port "
373 "in the range 2048-16383 is in use");
374 }
375
376
377
378 void
CreateListeningSocket(int * const socketP,int * const portNumP,const char ** const errorP)379 CreateListeningSocket(int * const socketP,
380 int * const portNumP,
381 const char ** const errorP) {
382 /*----------------------------------------------------------------------------
383 Create a TCP socket and bind it to the first unused port number we
384 can find.
385
386 Return as *socketP a file handle for the socket (on which Caller can
387 listen()), and as *portNumP the TCP port number (to which Caller's
388 partner can connect).
389 -----------------------------------------------------------------------------*/
390 int rc;
391
392 rc = socket(AF_INET, SOCK_STREAM, 0);
393 if (rc < 0)
394 pm_asprintf(errorP,
395 "Unable to create socket. "
396 "socket() failed with errno %d (%s)",
397 errno, strerror(errno));
398 else {
399 int const socketFd = rc;
400
401 unsigned short portNum;
402
403 *socketP = socketFd;
404
405 bindToUnusedPort(socketFd, &portNum, errorP);
406 if (!*errorP) {
407 int rc;
408
409 *portNumP = portNum;
410
411 /* would really like to wait for 1+numMachines machines,
412 but this is max allowable, unfortunately
413 */
414 rc = listen(socketFd, SOMAXCONN);
415 if (rc != 0)
416 pm_asprintf(errorP, "Unable to listen on TCP socket. "
417 "listen() fails with errno %d (%s)",
418 errno, strerror(errno));
419 }
420 if (*errorP)
421 close(socketFd);
422 }
423 }
424
425
426
427 void
AcceptConnection(int const listenSocketFd,int * const connectSocketFdP,const char ** const errorP)428 AcceptConnection(int const listenSocketFd,
429 int * const connectSocketFdP,
430 const char ** const errorP) {
431
432 struct sockaddr otherSocket;
433 socklenx_t otherSize;
434 /* This is an ugly dual-meaning variable. As input to accept(),
435 it is the storage size of 'otherSocket'. As output, it is the
436 data length of 'otherSocket'.
437 */
438 int rc;
439
440 otherSize = sizeof(otherSocket);
441
442 rc = accept(listenSocketFd, &otherSocket, &otherSize);
443
444 if (rc < 0)
445 pm_asprintf(errorP, "accept() failed with errno %d (%s). ",
446 errno, strerror(errno));
447 else {
448 *connectSocketFdP = rc;
449 *errorP = NULL;
450 }
451 }
452
453
454
455