1 /*
2 * %CopyrightBegin%
3 *
4 * Copyright Ericsson AB 1998-2021. All Rights Reserved.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * %CopyrightEnd%
19 */
20 /*
21 * Erlang port program to do the name service lookup for the erlang
22 * distribution and inet part of the kernel.
23 * A pool of subprocess is kept, to which a pair of pipes is connected.
24 * The main process schedules requests among the different subprocesses
25 * (created with fork()), to be able to handle as many requests as possible
26 * simultaneously. The controlling erlang machine may request a "cancel",
27 * in which case the process may be killed and restarted when the need arises.
28 * The single numeric parameter to this program is the maximum port pool size,
29 * which is the size of the bookkeeping array.
30 *
31 * Windows:
32 * There is instead of a pool of processes a pool of threads.
33 * Communication is not done through pipes but via message queues between
34 * the threads. The only "pipes" involved are the ones used for communicating
35 * with Erlang.
36 * Important note:
37 * For unknown reasons, the combination of a thread doing blocking I/O on
38 * a named pipe at the same time as another thread tries to resolve a hostname
39 * may (with certain software configurations) block the gethostbyname call (!)
40 * For that reason, standard input (and standard output) should be opened
41 * in asynchronous mode (FILE_FLAG_OVERLAPPED), which has to be done by Erlang.
42 * A special flag to open_port is used to work around this behaviour in winsock
43 * and the threads doing read and write handle asynchronous I/O.
44 * The ReadFile and WriteFile calls try to cope with both types of I/O, why
45 * the code is not really as Microsoft describes "the right way to do it" in
46 * their documentation. Important to note is that **there is no supported way
47 * to retrieve the information if the HANDLE was opened with
48 * FILE_FLAG_OVERLAPPED from the HANDLE itself**.
49 *
50 */
51
52 #ifdef HAVE_CONFIG_H
53 # include "config.h"
54 #endif
55
56 #include "erl_printf.h"
57
58 #ifdef WIN32
59
60 #define WIN32_LEAN_AND_MEAN
61 #include <winsock2.h>
62 #include <windows.h>
63 #include <ws2tcpip.h>
64 #include <process.h>
65 #include <stdio.h>
66 #include <stdlib.h>
67
68 /* These are not used even if they would exist which they should not */
69 #undef HAVE_GETIPNODEBYNAME
70 #undef HAVE_GETHOSTBYNAME2
71 #undef HAVE_GETIPNODEBYADDR
72
73 #else /* Unix */
74
75 #include <stdio.h>
76 #include <stdlib.h>
77 #include <string.h>
78 #include <unistd.h>
79 #include <stdarg.h>
80 #include <sys/types.h>
81 #include <sys/socket.h>
82 #include <sys/wait.h>
83 #include <netinet/in.h>
84 #include <arpa/inet.h>
85 #include <netdb.h>
86 #include <errno.h>
87 #include <signal.h>
88
89 #ifdef HAVE_SYS_TIME_H
90 #include <sys/time.h>
91 #else
92 #include <time.h>
93 #endif
94 #include <sys/times.h>
95
96 #ifndef RETSIGTYPE
97 #define RETSIGTYPE void
98 #endif
99
100 /* To simplify #ifdef code further down - select only one to be defined...
101 ** Use them in pairs - if one is broken do not trust its mate.
102 **/
103 #if defined(HAVE_GETADDRINFO) && defined(HAVE_GETNAMEINFO)
104 #undef HAVE_GETIPNODEBYNAME
105 #undef HAVE_GETIPNODEBYADDR
106 #undef HAVE_GETHOSTBYNAME2
107 #elif defined(HAVE_GETIPNODEBYNAME) && defined(HAVE_GETIPNODEBYADDR)
108 #undef HAVE_GETADDRINFO
109 #undef HAVE_GETNAMEINFO
110 #undef HAVE_GETHOSTBYNAME2
111 #else
112 #undef HAVE_GETIPNODEBYNAME
113 #undef HAVE_GETIPNODEBYADDR
114 #undef HAVE_GETADDRINFO
115 #undef HAVE_GETNAMEINFO
116 #endif
117
118 #endif /* !WIN32 */
119
120 #define PACKET_BYTES 4
121 #ifdef WIN32
122 #define READ_PACKET_BYTES(X,Y,Z) read_int32((X),(Y),(Z))
123 #else
124 #define READ_PACKET_BYTES(X,Y) read_int32((X),(Y))
125 #endif
126 #define PUT_PACKET_BYTES(X,Y) put_int32((X),(Y))
127 /* The serial numbers of the requests */
128 typedef int SerialType;
129
130 #define INVALID_SERIAL -1
131
132 /* The operations performed by this program */
133 typedef unsigned char OpType;
134
135 #define OP_GETHOSTBYNAME 1
136 #define OP_GETHOSTBYADDR 2
137 #define OP_CANCEL_REQUEST 3
138 #define OP_CONTROL 4
139
140 /* The protocol (IPV4/IPV6) */
141 typedef unsigned char ProtoType;
142
143 #define PROTO_IPV4 1
144 #define PROTO_IPV6 2
145
146 /* OP_CONTROL */
147 typedef unsigned char CtlType;
148 #define SETOPT_DEBUG_LEVEL 0
149
150 /* The unit of an IP address (0 == error, 4 == IPV4, 16 == IPV6) */
151 typedef unsigned char UnitType;
152
153 #define UNIT_ERROR 0
154 #define UNIT_IPV4 4
155 #define UNIT_IPV6 16
156
157 /* And the byte type */
158 typedef unsigned char AddrByte; /* Must be compatible with character
159 datatype */
160
161 /*
162 * Marshalled format of request:
163 *{
164 * Serial: 32 bit big endian
165 * Op:8 bit [1,2,3]
166 * If op == 1 {
167 * Proto:8 bit [1,2]
168 * Str: Null terminated array of characters
169 * } Else if op == 2 {
170 * Proto:8 bit [1,2]
171 * If proto == 1 {
172 * B0..B3: 4 bytes, most significant first
173 * } Else (proto == 2) {
174 * B0..B15: 16 bytes, most significant first
175 * }
176 * }
177 * (No more if op == 3)
178 *}
179 * The request arrives as a packet, with 4 packet size bytes.
180 */
181
182 /* The main process unpackes the marshalled message and sends the data
183 * to a suitable port process or, in the case of a close request, kills the
184 * suitable port process. There is also a que of requests linked together,
185 * for when all subrocesses are busy.
186 */
187
188 typedef struct QueItem {
189 struct QueItem *next;
190 int req_size;
191 AddrByte request[1];
192 } QueItem; /* Variable size due to request's variable size */
193
194 QueItem *que_first;
195 QueItem *que_last;
196
197 #ifdef WIN32
198 typedef struct mesq {
199 HANDLE data_present;
200 CRITICAL_SECTION crit;
201 int shutdown;
202 QueItem *first;
203 QueItem *last;
204 } MesQ;
205
206 MesQ *to_erlang;
207 MesQ *from_erlang;
208 #endif
209
210 /*
211 * Marshalled format of reply:
212 *{
213 * Serial: 32 bit big endian
214 * Unit: 8 bit, same as h_length or 0 for error
215 * if unit == 0 {
216 * Str: Null terminated character string explaining the error
217 * } else {
218 * Naddr: 32 bit big endian
219 * if unit = 4 {
220 * (B0..B3)0..(B0..B3)Naddr-1: Naddr*4 bytes most significant first
221 * } else if unit == 16 {
222 * (B0..B15)0..(B0..B15)Naddr-1: Naddr*16 bytes most significant first
223 * }
224 * Nnames: 32 bit big endian >= 1
225 * Name0: Null terminated string of characters
226 * Alias[0]..Alias[Nnames - 2]: Nnames - 1 Null terminated strings of chars
227 * }
228 *}
229 * Four packet size bytes prepended (big endian)
230 */
231 /* Internal error codes */
232 #define ERRCODE_NOTSUP 1
233 #define ERRCODE_HOST_NOT_FOUND 2
234 #define ERRCODE_TRY_AGAIN 3
235 #define ERRCODE_NO_RECOVERY 4
236 #define ERRCODE_NO_DATA 5
237 #define ERRCODE_NETDB_INTERNAL 7
238
239 /*
240 * Each worker process is represented in the parent by the following struct
241 */
242
243 typedef unsigned WorkerState;
244
245 #define WORKER_EMPTY 0 /* No process created */
246 #define WORKER_FREE 1 /* Living waiting process */
247 #define WORKER_BUSY 2 /* Living busy process */
248 #define WORKER_STALLED 3 /* Living cancelled process */
249
250 /* The timeout when killing a child process in seconds*/
251 #define CHILDWAIT_TMO 1
252 /* The domainname size_limit */
253 #define DOMAINNAME_MAX 258 /* 255 + Opcode + Protocol + Null termination */
254
255 typedef struct {
256 WorkerState state;
257 #ifdef WIN32
258 DWORD pid; /* 0 if unused */
259 MesQ *writeto; /* Message queues */
260 MesQ *readfrom;
261 #else
262 pid_t pid; /* -1 if unused */
263 int writeto, readfrom; /* Pipes */
264 #endif
265 SerialType serial;
266 AddrByte domain[DOMAINNAME_MAX];
267 QueItem *que_first;
268 QueItem *que_last;
269 int que_size;
270 } Worker;
271
272 int num_busy_workers;
273 int num_free_workers;
274 int num_stalled_workers;
275 int max_workers;
276 int greedy_threshold;
277 Worker *busy_workers; /* Workers doing any job that someone really is
278 interested in */
279 Worker *free_workers; /* Really free workers */
280 Worker *stalled_workers; /* May still deliver answers which we will
281 discard */
282 #define BEE_GREEDY() (num_busy_workers >= greedy_threshold)
283
284 static char *program_name;
285
286 static int debug_level;
287 #ifdef WIN32
288 static HANDLE debug_console_allocated = INVALID_HANDLE_VALUE;
289 #endif
290
291 #ifdef NODEBUG
292 #define DEBUGF(L,P) /* Nothing */
293 #else
294 #define DEBUGF(Level,Printf) do { if (debug_level >= (Level)) \
295 debugf Printf;} while(0)
296 #endif
297 #define ALLOC(Size) my_malloc(Size)
298 #define REALLOC(Old, Size) my_realloc((Old), (Size))
299 #define FREE(Ptr) free(Ptr)
300
301 #ifdef WIN32
302 #define WAKEUP_WINSOCK() do { \
303 char dummy_buff[100]; \
304 gethostname(dummy_buff,99); \
305 } while (0)
306 #endif
307
308 /* The internal prototypes */
309 static char *format_address(int siz, AddrByte *addr);
310 static void debugf(char *format, ...);
311 static void warning(char *format, ...);
312 static void fatal(char *format, ...);
313 static void *my_malloc(size_t size);
314 static void *my_realloc(void *old, size_t size);
315 static int get_int32(AddrByte *buff);
316 static void put_int32(AddrByte *buff, int value);
317 static int create_worker(Worker *pworker, int save_que);
318 static int map_netdb_error(int netdb_code);
319 #if defined(HAVE_GETADDRINFO) || defined(HAVE_GETNAMEINFO)
320 static int map_netdb_error_ai(int netdb_code);
321 #endif
322 static char *errcode_to_string(int errcode);
323 static size_t build_error_reply(SerialType serial, int errnum,
324 AddrByte **preply,
325 size_t *preply_size);
326 #ifdef HAVE_GETADDRINFO
327 static size_t build_reply_ai(SerialType serial, int, int,
328 struct addrinfo *,
329 AddrByte **preply, size_t *preply_size);
330 #endif
331 static size_t build_reply(SerialType serial, struct hostent *he,
332 AddrByte **preply, size_t *preply_size);
333 static int read_request(AddrByte **buff, size_t *buff_size);
334 static OpType get_op(AddrByte *buff);
335 static AddrByte *get_op_addr(AddrByte *buff);
336 static SerialType get_serial(AddrByte *buff);
337 static ProtoType get_proto(AddrByte *buff);
338 static CtlType get_ctl(AddrByte *buff);
339 static AddrByte *get_data(AddrByte *buff);
340 static int get_debug_level(AddrByte *buff);
341 static int relay_reply(Worker *pw);
342 static int ignore_reply(Worker *pw);
343 static void init_workers(int max);
344 static void kill_worker(Worker *pw);
345 static Worker *pick_worker(void);
346 static void kill_last_picked_worker(void);
347 static void stall_worker(SerialType serial);
348 static int handle_io_busy(int ndx);
349 static int handle_io_free(int ndx);
350 static int handle_io_stalled(int ndx);
351 static void check_que(void);
352 static void main_loop(void);
353 static void usage(char *unknown);
354 static void domaincopy(AddrByte *out,AddrByte *in);
355 static int domaineq(AddrByte *d1, AddrByte *d2);
356 static int get_domainname(AddrByte *inbuff, int insize, AddrByte *domainbuff);
357 static Worker *pick_worker_greedy(AddrByte *domainbuff);
358 static void restart_worker(Worker *w);
359 static void start_que_request(Worker *w) ;
360 #ifdef WIN32
361 static int read_int32(HANDLE fd, int *res, HANDLE ev);
362 static int read_exact(HANDLE fd, void *vbuff, DWORD nbytes, HANDLE ev);
363 static int write_exact(HANDLE fd, AddrByte *buff, DWORD len,HANDLE ev);
364 DWORD WINAPI worker_loop(void *v);
365 DWORD WINAPI reader(void *data);
366 DWORD WINAPI writer(void *data);
367 static int send_mes_to_worker(QueItem *m, Worker *pw);
368 BOOL create_mesq(MesQ **q);
369 BOOL enque_mesq(MesQ *q, QueItem *m);
370 BOOL deque_mesq(MesQ *q, QueItem **m);
371 BOOL close_mesq(MesQ *q);
372 HANDLE event_mesq(MesQ *q);
373 #else
374 static size_t read_int32(int fd, int *res);
375 static ssize_t read_exact(int fd, void *vbuff, size_t nbytes);
376 static int write_exact(int fd, AddrByte *buff, int len);
377 void reap_children(int ignored);
378 static void init_signals(void);
379 static void kill_all_workers(void);
380 static void close_all_worker_fds(void);
381 static int worker_loop(void);
382 static int fillin_reply(Worker *pw);
383 static int send_request_to_worker(AddrByte *pr, int rsize, Worker *pw);
384 #endif
385
386 #define ERL_DBG_LVL_ENV_VAR "ERL_INET_GETHOST_DEBUG"
387
388 static int
get_env_debug_level(void)389 get_env_debug_level(void)
390 {
391 #ifdef __WIN32__
392 char value[21]; /* Enough for any 64-bit values */
393 DWORD sz = GetEnvironmentVariable((LPCTSTR) ERL_DBG_LVL_ENV_VAR,
394 (LPTSTR) value,
395 (DWORD) sizeof(value));
396 if (sz == 0 || sz > sizeof(value))
397 return 0;
398 #else
399 char *value = getenv(ERL_DBG_LVL_ENV_VAR);
400 if (!value)
401 return 0;
402 #endif
403 return atoi(value);
404 }
405
406 #ifdef WIN32
do_allocate_console(void)407 static void do_allocate_console(void)
408 {
409 AllocConsole();
410 debug_console_allocated = CreateFile ("CONOUT$", GENERIC_WRITE,
411 FILE_SHARE_WRITE, NULL,
412 OPEN_EXISTING,
413 FILE_ATTRIBUTE_NORMAL, NULL);
414 }
415 #ifdef HARDDEBUG
416 DWORD WINAPI pseudo_worker_loop(void *v);
417 static void poll_gethost(int row);
418 #endif
419 #endif
420
421 /*
422 * Main
423 */
main(int argc,char ** argv)424 int main(int argc, char **argv)
425 {
426 int num_workers = 1;
427 char **ap = argv + 1;
428 int x;
429 int disable_greedy = 0;
430
431 program_name = *argv;
432 que_first = que_last = NULL;
433 debug_level = get_env_debug_level();
434 greedy_threshold = 0;
435
436 while (*ap) {
437 if (!strcmp(*ap, "-d")) {
438 ++debug_level;
439 } else if(!strcmp(*ap, "-g") && *(ap + 1)) {
440 ++ap;
441 x = atoi(*ap);
442 if (!x) {
443 usage(*ap);
444 } else {
445 greedy_threshold = x;
446 }
447 } else if(!strcmp(*ap, "-ng")) {
448 disable_greedy = 1;
449 } else {
450 x = atoi(*ap);
451 if (!x) {
452 usage(*ap);
453 } else {
454 num_workers = x;
455 }
456 }
457 ++ap;
458 }
459
460 #ifdef WIN32
461 if (num_workers > 60 || greedy_threshold > 60) {
462 usage("More than 60 workers on windows impossible!");
463 num_workers = 60;
464 greedy_threshold = 0;
465 }
466 #endif
467
468 if(!greedy_threshold) {
469 greedy_threshold = (3*num_workers)/4; /* 75% */
470 if (!greedy_threshold) {
471 greedy_threshold = num_workers;
472 }
473 }
474
475 if (disable_greedy) {
476 greedy_threshold = num_workers + 1;
477 }
478
479 #ifdef WIN32
480 {
481 WORD wr;
482 WSADATA wsa_data;
483 int wsa_error;
484 wr = MAKEWORD(2,0);
485
486 wsa_error = WSAStartup(wr,&wsa_data);
487 if (wsa_error) {
488 fatal("Could not open usable winsock library.");
489 }
490 if (LOBYTE(wsa_data.wVersion) != 2 || HIBYTE(wsa_data.wVersion) != 0) {
491 fatal("Could not open recent enough winsock library.");
492 }
493
494 if (debug_level >= 1) {
495 do_allocate_console();
496
497 DEBUGF(1,("num_workers = %d, greedy_threshold = %d, "
498 "debug_level = %d.",
499 num_workers, greedy_threshold, debug_level));
500 }
501 }
502 WAKEUP_WINSOCK(); /* Why on earth is this needed? */
503
504 #endif
505
506 init_workers(num_workers);
507
508 main_loop();
509 #ifndef WIN32
510 kill_all_workers();
511 #endif
512 return 0;
513 }
514
usage(char * unknown)515 static void usage(char *unknown)
516 {
517 fprintf(stderr,"%s: Unknown option \"%s\"\n"
518 "Usage: %s [-d [-d ...]] [-g <greedy threshold>] "
519 "[<number of workers>]\n",
520 program_name, unknown, program_name);
521 }
522
523 /*
524 * Main process main loop
525 */
526
handle_io_busy(int ndx)527 static int handle_io_busy(int ndx)
528 {
529 /* Probably an answer */
530 int res;
531 res = relay_reply(&busy_workers[ndx]);
532 if (res < 0) {
533 /* Bad worker */
534 if (busy_workers[ndx].que_size) {
535 restart_worker(&busy_workers[ndx]);
536 start_que_request(&busy_workers[ndx]);
537 return 0;
538 } else {
539 kill_worker(&busy_workers[ndx]);
540 --num_busy_workers;
541 busy_workers[ndx] = busy_workers[num_busy_workers];
542 }
543 return 1;
544 } else if (res == 0) {
545 /* Erlang has closed */
546 return -1;
547 } else {
548 if (busy_workers[ndx].que_size) {
549 start_que_request(&busy_workers[ndx]);
550 return 0;
551 }
552 /* The worker is no longer busy, it should be in the free list */
553 free_workers[num_free_workers] = busy_workers[ndx];
554 free_workers[num_free_workers].state = WORKER_FREE;
555 ++num_free_workers;
556 --num_busy_workers;
557 busy_workers[ndx] = busy_workers[num_busy_workers];
558 return 1;
559 }
560 }
561
handle_io_free(int ndx)562 static int handle_io_free(int ndx)
563 {
564 /* IO from a free worker means "kill me" */
565 DEBUGF(1,("Free worker[%ld] spontaneously died.",
566 (long) free_workers[ndx].pid));
567 kill_worker(&free_workers[ndx]);
568 --num_free_workers;
569 free_workers[ndx] = free_workers[num_free_workers];
570 return 1;
571 }
572
handle_io_stalled(int ndx)573 static int handle_io_stalled(int ndx)
574 {
575 int res;
576 res = ignore_reply(&stalled_workers[ndx]);
577 if (res <= 0) {
578 /* Bad worker */
579 kill_worker(&stalled_workers[ndx]);
580 --num_stalled_workers;
581 stalled_workers[ndx] = stalled_workers[num_stalled_workers];
582 return 1;
583 } else {
584 DEBUGF(3,("Ignoring reply from stalled worker[%ld].",
585 (long) stalled_workers[ndx].pid));
586 free_workers[num_free_workers] = stalled_workers[ndx];
587 free_workers[num_free_workers].state = WORKER_FREE;
588 ++num_free_workers;
589 --num_stalled_workers;
590 stalled_workers[ndx] = stalled_workers[num_stalled_workers];
591 return 1;
592 }
593 }
594
check_que(void)595 static void check_que(void)
596 {
597 /* Check if anything in the que can be handled */
598 Worker *cw;
599
600 while (que_first) {
601 QueItem *qi,*nxt;
602 qi = que_first;
603 nxt = qi->next; /* Need to save before it's getting put in another que
604 in threaded solution */
605 if ((cw = pick_worker()) == NULL) {
606 break;
607 }
608 #ifdef WIN32
609 {
610 SerialType save_serial = get_serial(que_first->request);
611 if (send_mes_to_worker(que_first, cw) != 0) {
612 kill_last_picked_worker();
613 continue;
614 }
615 cw->serial = save_serial;
616 }
617 #else
618 if (send_request_to_worker(que_first->request,
619 que_first->req_size, cw) != 0) {
620 /* Couldn't send request, kill the worker and retry */
621 kill_last_picked_worker();
622 continue;
623 }
624 cw->serial = get_serial(que_first->request);
625 #endif
626 /* Went well, lets deque */
627 que_first = nxt;
628 if (que_first == NULL) {
629 que_last = NULL;
630 }
631 DEBUGF(3,("Did deque serial %d, Que is %sempty",
632 get_serial(qi->request), (que_first) ? "not " : ""));
633 #ifndef WIN32
634 FREE(qi);
635 #endif
636 }
637 }
638
clean_que_of(SerialType s)639 static int clean_que_of(SerialType s)
640 {
641 QueItem **qi;
642 int i;
643
644 for(qi=&que_first;*qi != NULL &&
645 s != get_serial((*qi)->request); qi = &((*qi)->next))
646 ;
647 if(*qi != NULL) {
648 QueItem *r = *qi;
649 *qi = (*qi)->next;
650 FREE(r);
651 if(que_last == r) {
652 /* Lost the "last" pointer, should be very uncommon
653 if the que is not empty, so we simply do a traversal
654 to reclaim it. */
655 if (que_first == NULL) {
656 que_last = NULL;
657 } else {
658 for (que_last=que_first;que_last->next != NULL;
659 que_last = que_last->next)
660 ;
661 }
662 }
663 DEBUGF(3,("Removing serial %d from global que on request, "
664 "que %sempty",s, (que_first) ? "not " : ""));
665 return 1;
666 }
667 for (i = 0; i < num_busy_workers; ++i) {
668 for(qi=&(busy_workers[i].que_first);*qi != NULL &&
669 s != get_serial((*qi)->request); qi = &((*qi)->next))
670 ;
671 if(*qi != NULL) {
672 QueItem *r = *qi;
673 *qi = (*qi)->next;
674 FREE(r);
675 if(busy_workers[i].que_last == r) {
676 /* Lost the "last" pointer, should be very uncommon
677 if the que is not empty, so we simply do a traversal
678 to reclaim it. */
679 if (busy_workers[i].que_first == NULL) {
680 busy_workers[i].que_last = NULL;
681 if (busy_workers[i].que_size != 1) {
682 fatal("Worker que size counter incorrect, internal datastructure error.");
683 }
684 } else {
685 for (busy_workers[i].que_last = busy_workers[i].que_first;
686 busy_workers[i].que_last->next != NULL;
687 busy_workers[i].que_last = busy_workers[i].que_last->next)
688 ;
689 }
690 }
691 --(busy_workers[i].que_size);
692 DEBUGF(3,("Removing serial %d from worker[%ld] specific que "
693 "on request, que %sempty",
694 s, (long) busy_workers[i].pid,
695 (busy_workers[i].que_first) ? "not " : ""));
696 return 1;
697 }
698 }
699 return 0;
700 }
701
main_loop(void)702 static void main_loop(void)
703 {
704 AddrByte *inbuff = NULL;
705 int insize;
706 int i,w;
707 #ifdef WIN32
708 HANDLE handles[64];
709 DWORD num_handles;
710 DWORD index;
711 QueItem *qi;
712 #else
713 size_t inbuff_size = 0;
714 fd_set fds;
715 int max_fd;
716 #endif
717 int new_data;
718 int save_serial;
719 /* It's important that the free workers list is handled first */
720 Worker *workers[3] = {free_workers, busy_workers, stalled_workers};
721 int *wsizes[3] = {&num_free_workers, &num_busy_workers,
722 &num_stalled_workers};
723 int (*handlers[3])(int) = {&handle_io_free, &handle_io_busy,
724 &handle_io_stalled};
725 Worker *cw;
726 AddrByte domainbuff[DOMAINNAME_MAX];
727
728 #ifdef WIN32
729
730 {
731 DWORD dummy;
732 /* Create the reader and writer */
733 if ((!create_mesq(&to_erlang)) || (!create_mesq(&from_erlang))) {
734 fatal("Could not create message que! errno = %d.",GetLastError());
735 }
736 if (((HANDLE) _beginthreadex(NULL,0,writer,to_erlang,0,&dummy))
737 == NULL) {
738 fatal("Could not create writer thread! errno = %d.",GetLastError());
739 }
740 if (((HANDLE) _beginthreadex(NULL,0,reader,from_erlang,0,&dummy))
741 == NULL) {
742 fatal("Could not create reader thread! errno = %d.",GetLastError());
743 }
744 DEBUGF(4,("Created reader and writer threads."));
745 #ifdef HARDDEBUG
746 poll_gethost(__LINE__);
747 #endif
748 }
749 #endif
750
751 for(;;) {
752 #ifdef WIN32
753 num_handles = 0;
754 handles[num_handles++] = event_mesq(from_erlang);
755 for (w = 0; w < 3; ++w) {
756 for (i = 0; i < *wsizes[w]; ++i) {
757 handles[num_handles++] = event_mesq(workers[w][i].readfrom);
758 }
759 }
760
761 if ((index = WaitForMultipleObjects(num_handles, handles, FALSE, INFINITE))
762 == WAIT_FAILED) {
763 fatal("Could not WaitForMultpleObjects! errno = %d.",GetLastError());
764 }
765 w = 0;
766 index -= WAIT_OBJECT_0;
767
768 DEBUGF(4,("Got data on index %d.",index));
769 if (index > 0) {
770 if (((int)index - 1) < *wsizes[0]) {
771 (*handlers[0])(index - 1);
772 } else if (((int)index - 1) < ((*wsizes[0]) + (*wsizes[1]))) {
773 (*handlers[1])(index - 1 - (*wsizes[0]));
774 } else {
775 (*handlers[2])(index - 1 - (*wsizes[0]) - (*wsizes[1]));
776 }
777 }
778 new_data = (index == 0);
779 #else
780 max_fd = 0;
781 FD_ZERO(&fds);
782 FD_SET(0,&fds);
783 for (w = 0; w < 3; ++w) {
784 for (i = 0; i < *wsizes[w]; ++i) {
785 FD_SET(workers[w][i].readfrom,&fds);
786 if (workers[w][i].readfrom > max_fd) {
787 max_fd = workers[w][i].readfrom;
788 }
789 }
790 }
791 for (;;) {
792 if (select(max_fd + 1,&fds,NULL,NULL,NULL) < 0) {
793 if (errno == EINTR) {
794 continue;
795 } else {
796 fatal("Select failed (invalid internal structures?), "
797 "errno = %d.",errno);
798 }
799 }
800 break;
801 }
802 for (w = 0; w < 3; ++w) {
803 for (i = 0; i < *wsizes[w]; ++i) {
804 if (FD_ISSET(workers[w][i].readfrom, &fds)) {
805 int hres = (*handlers[w])(i);
806 if (hres < 0) {
807 return;
808 } else {
809 i -= hres; /* We'll retry this position, if hres == 1.
810 The position is usually
811 replaced with another worker,
812 a worker with
813 I/O usually changes state as we
814 use blocking file I/O */
815 }
816 }
817 }
818 }
819 new_data = FD_ISSET(0,&fds);
820
821 #endif
822
823 check_que();
824
825 /* Now check for new requests... */
826 if (new_data) { /* Erlang... */
827 OpType op;
828 #ifdef WIN32
829 if (!deque_mesq(from_erlang,&qi)) {
830 DEBUGF(1,("Erlang has closed."));
831 return;
832 }
833 insize = qi->req_size;
834 inbuff = qi->request;
835 DEBUGF(4,("Got data from erlang."));
836 DEBUGF(4,("OPeration == %d.",get_op(inbuff)));
837 #else
838 insize = read_request(&inbuff, &inbuff_size);
839 if (insize == 0) { /* Other errors taken care of in
840 read_request */
841 DEBUGF(1,("Erlang has closed."));
842 return;
843 }
844 #endif
845 op = get_op(inbuff);
846 if (op == OP_CANCEL_REQUEST) {
847 SerialType serial = get_serial(inbuff);
848 if (!clean_que_of(serial)) {
849 for (i = 0; i < num_busy_workers; ++i) {
850 if (busy_workers[i].serial == serial) {
851 if (busy_workers[i].que_size) {
852 restart_worker(&busy_workers[i]);
853 start_que_request(&busy_workers[i]);
854 } else {
855 stall_worker(i);
856 check_que();
857 }
858 break;
859 }
860 }
861 }
862 #ifdef WIN32
863 FREE(qi);
864 #endif
865 continue; /* New select */
866 } else if (op == OP_CONTROL) {
867 CtlType ctl;
868 SerialType serial = get_serial(inbuff);
869 if (serial != INVALID_SERIAL) {
870 fatal("Invalid serial: %d.", serial);
871 }
872 switch (ctl = get_ctl(inbuff)) {
873 case SETOPT_DEBUG_LEVEL:
874 {
875 int tmp_debug_level = get_debug_level(inbuff);
876 #ifdef WIN32
877 if (debug_console_allocated == INVALID_HANDLE_VALUE &&
878 tmp_debug_level > 0) {
879 DWORD res;
880 do_allocate_console();
881 WriteFile(debug_console_allocated,
882 "Hej\n",4,&res,NULL);
883 }
884 #endif
885 debug_level = tmp_debug_level;
886 DEBUGF(debug_level, ("debug_level = %d", debug_level));
887 for (w = 0; w < 3; ++w) {
888 for (i = 0; i < *wsizes[w]; i++) {
889 int res;
890 #ifdef WIN32
891 QueItem *m;
892 #endif
893 cw = &(workers[w][i]);
894 #ifdef WIN32
895 m = ALLOC(sizeof(QueItem) - 1 + qi->req_size);
896 memcpy(m->request, qi->request,
897 (m->req_size = qi->req_size));
898 m->next = NULL;
899 if ((res = send_mes_to_worker(m, cw)) != 0) {
900 FREE(m);
901 }
902 #else
903 res = send_request_to_worker(inbuff, insize, cw);
904 #endif
905 if (res != 0) {
906 kill_worker(cw);
907 (*wsizes[w])--;
908 *cw = workers[w][*wsizes[w]];
909 }
910 }
911 }
912 }
913 break;
914 default:
915 warning("Unknown control requested from erlang (%d), "
916 "message discarded.", (int) ctl);
917 break;
918 }
919 #ifdef WIN32
920 FREE(qi);
921 #endif
922 continue; /* New select */
923 } else {
924 ProtoType proto;
925 if (op != OP_GETHOSTBYNAME && op != OP_GETHOSTBYADDR) {
926 warning("Unknown operation requested from erlang (%d), "
927 "message discarded.", op);
928 #ifdef WIN32
929 FREE(qi);
930 #endif
931 continue;
932 }
933 if ((proto = get_proto(inbuff)) != PROTO_IPV4 &&
934 proto != PROTO_IPV6) {
935 warning("Unknown protocol requested from erlang (%d), "
936 "message discarded.", proto);
937 #ifdef WIN32
938 FREE(qi);
939 #endif
940 continue;
941 }
942 if (get_domainname(inbuff,insize,domainbuff) < 0) {
943 warning("Malformed message sent from erlang, no domain, "
944 "message discarded.", op);
945 #ifdef WIN32
946 FREE(qi);
947 #endif
948 continue;
949 }
950 }
951
952 if (BEE_GREEDY()) {
953 DEBUGF(4,("Beeing greedy!"));
954 if ((cw = pick_worker_greedy(domainbuff)) != NULL) {
955 /* Put it in the worker specific que if the
956 domainname matches... */
957 #ifndef WIN32
958 QueItem *qi = ALLOC(sizeof(QueItem) - 1 +
959 insize);
960 qi->req_size = insize;
961 memcpy(&(qi->request), inbuff, insize);
962 qi->next = NULL;
963 #endif
964 if (!cw->que_first) {
965 cw->que_first = cw->que_last = qi;
966 } else {
967 cw->que_last->next = qi;
968 cw->que_last = qi;
969 }
970 ++(cw->que_size);
971 continue;
972 }
973 /* Otherwise busyness as usual */
974 }
975
976 save_serial = get_serial(inbuff);
977
978 while ((cw = pick_worker()) != NULL) {
979 int res;
980 #ifdef WIN32
981 res = send_mes_to_worker(qi,cw);
982 #else
983 res = send_request_to_worker(inbuff, insize, cw);
984 #endif
985 if (res == 0) {
986 break;
987 } else {
988 kill_last_picked_worker();
989 }
990 }
991
992 if (cw == NULL) {
993 /* Insert into que */
994 #ifndef WIN32
995 QueItem *qi = ALLOC(sizeof(QueItem) - 1 +
996 insize);
997 qi->req_size = insize;
998 memcpy(&(qi->request), inbuff, insize);
999 qi->next = NULL;
1000 #endif
1001 if (!que_first) {
1002 que_first = que_last = qi;
1003 } else {
1004 que_last->next = qi;
1005 que_last = qi;
1006 }
1007 } else {
1008 cw->serial = save_serial;
1009 domaincopy(cw->domain, domainbuff);
1010 }
1011 }
1012 }
1013 }
1014
1015 /*
1016 * Main process worker administration
1017 */
1018
init_workers(int max)1019 static void init_workers(int max)
1020 {
1021 max_workers = max;
1022 num_busy_workers = 0;
1023 num_free_workers = 0;
1024 num_stalled_workers = 0;
1025
1026 busy_workers = ALLOC(sizeof(Worker) * max_workers);
1027 free_workers = ALLOC(sizeof(Worker) * max_workers);
1028 stalled_workers = ALLOC(sizeof(Worker) * max_workers);
1029 #ifndef WIN32
1030 init_signals();
1031 #endif
1032 }
1033
1034 #ifdef WIN32
kill_worker(Worker * pw)1035 static void kill_worker(Worker *pw)
1036 {
1037 /* Cannot really kill a thread in win32, have to just leave it to die */
1038 close_mesq(pw->writeto);
1039 close_mesq(pw->readfrom);
1040 pw->state = WORKER_EMPTY;
1041 }
1042 #else
kill_worker(Worker * pw)1043 static void kill_worker(Worker *pw)
1044 {
1045 fd_set fds;
1046 struct timeval tmo;
1047 int selret;
1048 static char buff[1024];
1049
1050 DEBUGF(3,("Killing worker[%ld] with fd %d, serial %d",
1051 (long) pw->pid,
1052 (int) pw->readfrom,
1053 (int) pw->serial));
1054 kill(pw->pid, SIGUSR1);
1055 /* This is all just to check that the child died, not
1056 really necessary */
1057 for(;;) {
1058 FD_ZERO(&fds);
1059 FD_SET(pw->readfrom, &fds);
1060 tmo.tv_usec=0;
1061 tmo.tv_sec = CHILDWAIT_TMO;
1062 selret = select(pw->readfrom+1, &fds, NULL, NULL, &tmo);
1063 if (selret < 0) {
1064 if (errno != EINTR) {
1065 warning("Unable to select on dying child file descriptor, "
1066 "errno = %d.",errno);
1067 break;
1068 }
1069 } else if (selret == 0) {
1070 warning("Timeout waiting for child process to die, "
1071 "ignoring child (pid = %d).", pw->pid);
1072 break;
1073 } else {
1074 int ret;
1075 if ((ret = read(pw->readfrom, buff, 1024)) < 0) {
1076 if (errno != EINTR) {
1077 warning("Child file descriptor not closed properly, "
1078 "errno = %d", errno);
1079 break;
1080 }
1081 } else if (ret == 0) {
1082 break;
1083 }
1084 /* continue */
1085 }
1086 }
1087 /* Waiting is done by signal handler... */
1088 close(pw->readfrom);
1089 close(pw->writeto);
1090 pw->state = WORKER_EMPTY;
1091 /* Leave rest as is... */
1092 }
1093
kill_all_workers(void)1094 static void kill_all_workers(void)
1095 /* Emergency function, will not check that the children died... */
1096 {
1097 int i;
1098 for (i = 0; i < num_busy_workers; ++i) {
1099 kill(busy_workers[i].pid, SIGUSR1);
1100 }
1101 for (i = 0; i < num_free_workers; ++i) {
1102 kill(free_workers[i].pid, SIGUSR1);
1103 }
1104 for (i = 0; i < num_stalled_workers; ++i) {
1105 kill(stalled_workers[i].pid, SIGUSR1);
1106 }
1107 }
1108 #endif /* !WIN32 */
1109
pick_worker(void)1110 static Worker *pick_worker(void)
1111 {
1112 Worker tmp;
1113 if (num_free_workers > 0) {
1114 --num_free_workers;
1115 tmp = free_workers[num_free_workers];
1116 } else if (num_stalled_workers > 0) {
1117 /* "restart" the worker... */
1118 --num_stalled_workers;
1119 kill_worker(&(stalled_workers[num_stalled_workers]));
1120 if (create_worker(&tmp,0) < 0) {
1121 warning("Unable to create worker process, insufficient "
1122 "resources");
1123 return NULL;
1124 }
1125 } else {
1126 if (num_busy_workers == max_workers) {
1127 return NULL;
1128 }
1129 if (create_worker(&tmp,0) < 0) {
1130 warning("Unable to create worker process, insufficient "
1131 "resources");
1132 return NULL;
1133 }
1134 }
1135 /* tmp contains a worker now, make it busy and put it in the right
1136 array */
1137 tmp.state = WORKER_BUSY;
1138 busy_workers[num_busy_workers] = tmp;
1139 ++num_busy_workers;
1140 return &(busy_workers[num_busy_workers-1]);
1141 }
1142
pick_worker_greedy(AddrByte * domainbuff)1143 static Worker *pick_worker_greedy(AddrByte *domainbuff)
1144 {
1145 int i;
1146 int found = -1;
1147 for (i=0; i < num_busy_workers; ++i) {
1148 if (domaineq(busy_workers[i].domain, domainbuff)) {
1149 if ((found < 0) || (busy_workers[i].que_size <
1150 busy_workers[found].que_size)) {
1151 found = i;
1152 }
1153 }
1154 }
1155 if (found >= 0) {
1156 return &busy_workers[found];
1157 }
1158 return NULL;
1159 }
1160
restart_worker(Worker * w)1161 static void restart_worker(Worker *w)
1162 {
1163 kill_worker(w);
1164 if (create_worker(w,1) < 0) {
1165 fatal("Unable to create worker process, insufficient resources");
1166 }
1167 }
1168
kill_last_picked_worker(void)1169 static void kill_last_picked_worker(void)
1170 {
1171 kill_worker( &(busy_workers[num_busy_workers-1]));
1172 --num_busy_workers;
1173 }
1174
1175 /*
1176 * Starts a request qued to a specific worker, check_que starts normally queued requests.
1177 * We expect a que here...
1178 */
start_que_request(Worker * w)1179 static void start_que_request(Worker *w)
1180 {
1181 QueItem *qi;
1182 SerialType save_serial;
1183 if (!w->que_first || !w->que_size) {
1184 fatal("Expected que'd requests but found none, "
1185 "internal datastructure corrupted!");
1186 }
1187 qi = w->que_first;
1188 w->que_first = w->que_first->next;
1189 if (!w->que_first) {
1190 w->que_last = NULL;
1191 }
1192 --(w->que_size);
1193 save_serial = get_serial(qi->request);
1194 #ifdef WIN32
1195 while (send_mes_to_worker(qi, w) != 0) {
1196 restart_worker(w);
1197 }
1198 #else
1199 while (send_request_to_worker(qi->request,
1200 qi->req_size, w) != 0) {
1201 restart_worker(w);
1202 }
1203 #endif
1204 w->serial = save_serial;
1205 DEBUGF(3,("Did deque serial %d from worker[%ld] specific que, "
1206 "Que is %sempty",
1207 get_serial(qi->request), (long) w->pid,
1208 (w->que_first) ? "not " : ""));
1209 #ifndef WIN32
1210 FREE(qi);
1211 #endif
1212 }
1213
1214 #ifndef WIN32
1215 /* Signal utilities */
sys_sigset(int sig,RETSIGTYPE (* func)(int))1216 static RETSIGTYPE (*sys_sigset(int sig, RETSIGTYPE (*func)(int)))(int)
1217 {
1218 struct sigaction act, oact;
1219
1220 sigemptyset(&act.sa_mask);
1221 act.sa_flags = 0;
1222 act.sa_handler = func;
1223 sigaction(sig, &act, &oact);
1224 return(oact.sa_handler);
1225 }
1226
1227
sys_sigblock(int sig)1228 static void sys_sigblock(int sig)
1229 {
1230 sigset_t mask;
1231
1232 sigemptyset(&mask);
1233 sigaddset(&mask, sig);
1234 sigprocmask(SIG_BLOCK, &mask, (sigset_t *)NULL);
1235 }
1236
sys_sigrelease(int sig)1237 static void sys_sigrelease(int sig)
1238 {
1239 sigset_t mask;
1240
1241 sigemptyset(&mask);
1242 sigaddset(&mask, sig);
1243 sigprocmask(SIG_UNBLOCK, &mask, (sigset_t *)NULL);
1244 }
1245
1246 /* Child signal handler */
reap_children(int ignored)1247 void reap_children(int ignored)
1248 {
1249 int res;
1250 sys_sigblock(SIGCHLD);
1251 for (;;) {
1252 while ((res = waitpid((pid_t)-1, NULL, WNOHANG)) > 0)
1253 ;
1254 if (!(res < 0 && errno == EAGAIN)) {
1255 DEBUGF(4,("reap_children: res = %d, errno = %d.",res,errno));
1256 break;
1257 }
1258 }
1259 sys_sigrelease(SIGCHLD);
1260 }
1261
init_signals(void)1262 static void init_signals(void)
1263 {
1264 sys_sigset(SIGCHLD,&reap_children); /* SIG_IGN would give same result
1265 on most (?) platforms. */
1266 sys_sigset(SIGPIPE, SIG_IGN);
1267 }
1268 #endif
1269
stall_worker(int ndx)1270 static void stall_worker(int ndx)
1271 {
1272 --num_busy_workers;
1273 stalled_workers[num_stalled_workers] = busy_workers[ndx];
1274 stalled_workers[num_stalled_workers].state = WORKER_STALLED;
1275 busy_workers[ndx] = busy_workers[num_busy_workers];
1276 DEBUGF(3, ("Stalled worker[%ld]",
1277 (long) stalled_workers[num_stalled_workers].pid));
1278 ++num_stalled_workers;
1279 }
1280
1281
1282 /*
1283 * Main loop message passing
1284 */
1285 #ifndef WIN32
read_request(AddrByte ** buff,size_t * buff_size)1286 static int read_request(AddrByte **buff, size_t *buff_size)
1287 {
1288 int siz;
1289 int r;
1290
1291 if ((r = READ_PACKET_BYTES(0,&siz)) != PACKET_BYTES) {
1292 if (r == 0) {
1293 return 0;
1294 } else {
1295 fatal("Unexpected end of file on main input, errno = %d",errno);
1296 }
1297 }
1298
1299 if (siz > *buff_size) {
1300 if (*buff_size == 0) {
1301 *buff = ALLOC((*buff_size = siz));
1302 } else {
1303 *buff = REALLOC(*buff, (*buff_size = siz));
1304 }
1305 }
1306 if (read_exact(0,*buff, siz) != siz) {
1307 fatal("Unexpected end of file on main input, errno = %d",errno);
1308 }
1309 if (siz < 5) {
1310 fatal("Unexpected message on main input, message size %d less "
1311 "than minimum.");
1312 }
1313 return siz;
1314 }
1315
1316 #endif /* !WIN32 */
1317
get_op(AddrByte * buff)1318 static OpType get_op(AddrByte *buff)
1319 {
1320 return (OpType) buff[4];
1321 }
1322
get_op_addr(AddrByte * buff)1323 static AddrByte *get_op_addr(AddrByte *buff)
1324 {
1325 return buff + 4;
1326 }
1327
get_serial(AddrByte * buff)1328 static SerialType get_serial(AddrByte *buff)
1329 {
1330 return get_int32(buff);
1331 }
1332
get_proto(AddrByte * buff)1333 static ProtoType get_proto(AddrByte *buff)
1334 {
1335 return (ProtoType) buff[5];
1336 }
1337
get_ctl(AddrByte * buff)1338 static CtlType get_ctl(AddrByte *buff)
1339 {
1340 return (CtlType) buff[5];
1341 }
1342
get_data(AddrByte * buff)1343 static AddrByte *get_data(AddrByte *buff)
1344 {
1345 return buff + 6;
1346 }
1347
get_debug_level(AddrByte * buff)1348 static int get_debug_level(AddrByte *buff)
1349 {
1350 return get_int32(buff + 6);
1351 }
1352
1353 #ifdef WIN32
send_mes_to_worker(QueItem * m,Worker * pw)1354 static int send_mes_to_worker(QueItem *m, Worker *pw)
1355 {
1356 if (!enque_mesq(pw->writeto, m)) {
1357 warning("Unable to send to child process.");
1358 return -1;
1359 }
1360 return 0;
1361 }
1362 #else
send_request_to_worker(AddrByte * pr,int rsize,Worker * pw)1363 static int send_request_to_worker(AddrByte *pr, int rsize, Worker *pw)
1364 {
1365 AddrByte hdr[PACKET_BYTES];
1366
1367 PUT_PACKET_BYTES(hdr, rsize);
1368 if (write_exact(pw->writeto, hdr, PACKET_BYTES) < 0) {
1369 warning("Unable to write to child process.");
1370 return -1;
1371 }
1372 if (write_exact(pw->writeto, (AddrByte *) pr, rsize) < 0) {
1373 warning("Unable to write to child process.");
1374 return -1;
1375 }
1376 return 0;
1377 }
1378 #endif /* !WIN32 */
1379
1380 #ifdef WIN32
relay_reply(Worker * pw)1381 static int relay_reply(Worker *pw)
1382 {
1383 QueItem *m;
1384 if (!deque_mesq(pw->readfrom,&m)) {
1385 return 0;
1386 }
1387 if (!enque_mesq(to_erlang,m)) {
1388 FREE(m);
1389 return 0;
1390 }
1391 return 1;
1392 }
1393
ignore_reply(Worker * pw)1394 static int ignore_reply(Worker *pw) {
1395 QueItem *m;
1396 if (!deque_mesq(pw->readfrom,&m)) {
1397 return 0;
1398 }
1399 FREE(m);
1400 return 1;
1401 }
1402
1403 #else
1404
1405 /* Static buffers used by the next three functions */
1406 static AddrByte *relay_buff = NULL;
1407 static int relay_buff_size = 0;
1408
fillin_reply(Worker * pw)1409 static int fillin_reply(Worker *pw)
1410 {
1411 int length;
1412
1413 if (READ_PACKET_BYTES(pw->readfrom, &length) != PACKET_BYTES) {
1414 warning("Malformed reply (header) from worker process %d.",
1415 pw->pid);
1416 return -1;
1417 }
1418
1419 if (relay_buff_size < (length + PACKET_BYTES)) {
1420 if (!relay_buff_size) {
1421 relay_buff =
1422 ALLOC((relay_buff_size = (length + PACKET_BYTES)));
1423 } else {
1424 relay_buff =
1425 REALLOC(relay_buff,
1426 (relay_buff_size = (length + PACKET_BYTES)));
1427 }
1428 }
1429 PUT_PACKET_BYTES(relay_buff, length);
1430 if (read_exact(pw->readfrom, relay_buff + PACKET_BYTES, length) !=
1431 length) {
1432 warning("Malformed reply (data) from worker process %d.", pw->pid);
1433 return -1;
1434 }
1435 return length;
1436 }
1437
relay_reply(Worker * pw)1438 static int relay_reply(Worker *pw)
1439 {
1440 int length = fillin_reply(pw); /* Filled into the "global" buffer */
1441 int res;
1442
1443 if (length < 0) {
1444 return -1;
1445 }
1446 if ((res = write_exact(1, relay_buff, length + PACKET_BYTES)) < 0) {
1447 fatal("Cannot write reply to erlang process, errno = %d.", errno);
1448 } else if (res == 0) {
1449 DEBUGF(1,("Erlang has closed write pipe."));
1450 return 0;
1451 }
1452 return length;
1453 }
1454
ignore_reply(Worker * pw)1455 static int ignore_reply(Worker *pw)
1456 {
1457 return fillin_reply(pw);
1458 }
1459
1460 #endif /* !WIN32 */
1461
1462 /*
1463 * Domain name "parsing" and worker specific queing
1464 */
domaincopy(AddrByte * out,AddrByte * in)1465 static void domaincopy(AddrByte *out, AddrByte *in)
1466 {
1467 AddrByte *ptr = out;
1468 *ptr++ = *in++;
1469 *ptr++ = *in++;
1470 switch(*out) {
1471 case OP_GETHOSTBYNAME:
1472 while(*in != '\0' && *in != '.')
1473 ++in;
1474 strncpy((char*)ptr, (char*)in, DOMAINNAME_MAX-2);
1475 ptr[DOMAINNAME_MAX-3] = '\0';
1476 DEBUGF(4,("Saved domainname %s.", ptr));
1477 return;
1478 case OP_GETHOSTBYADDR:
1479 memcpy(ptr,in, ((out[1] == PROTO_IPV4) ? UNIT_IPV4 : UNIT_IPV6) - 1);
1480 DEBUGF(4, ("Saved domain address: %s.",
1481 format_address(((out[1] == PROTO_IPV4) ?
1482 UNIT_IPV4 : UNIT_IPV6) - 1,ptr)));
1483 return;
1484 default:
1485 fatal("Trying to copy buffer not containing valid domain, [%d,%d].",
1486 (int) out[0], (int) out[1]);
1487 }
1488 }
1489
domaineq(AddrByte * d1,AddrByte * d2)1490 static int domaineq(AddrByte *d1, AddrByte *d2)
1491 {
1492 if (d1[0] != d2[0] || d1[1] != d2[1]) {
1493 return 0;
1494 }
1495 switch (d1[0]) {
1496 case OP_GETHOSTBYNAME:
1497 return !strcmp((char*)d1+2,(char*)d2+2);
1498 case OP_GETHOSTBYADDR:
1499 return !memcmp(d1+2,d2+2, ((d1[1] == PROTO_IPV4)
1500 ? UNIT_IPV4 : UNIT_IPV6) - 1);
1501 default:
1502 fatal("Trying to compare buffers not containing valid domain, "
1503 "[%d,%d].",
1504 (int) d1[0], (int) d1[1]);
1505 return -1; /* Lint... */
1506 }
1507 }
1508
get_domainname(AddrByte * inbuff,int insize,AddrByte * domainbuff)1509 static int get_domainname(AddrByte *inbuff, int insize, AddrByte *domainbuff)
1510 {
1511 OpType op = get_op(inbuff);
1512 ProtoType proto;
1513 int i;
1514 AddrByte *data;
1515
1516 data = get_data(inbuff);
1517 switch (op) {
1518 case OP_GETHOSTBYNAME:
1519 data = get_data(inbuff);
1520 for (i = (data - inbuff); i < insize && inbuff[i] != '\0'; ++i)
1521 ;
1522 if (i < insize) {
1523 domaincopy(domainbuff, get_op_addr(inbuff));
1524 return 0;
1525 }
1526 DEBUGF(3, ("Could not pick valid domainname in "
1527 "gethostbyname operation"));
1528 return -1;
1529 case OP_GETHOSTBYADDR:
1530 proto = get_proto(inbuff);
1531 i = insize - (data - inbuff);
1532 if ((proto == PROTO_IPV4 && i == UNIT_IPV4) ||
1533 (proto == PROTO_IPV6 && i == UNIT_IPV6)) {
1534 /* An address buffer */
1535 domaincopy(domainbuff, get_op_addr(inbuff));
1536 return 0;
1537 }
1538 DEBUGF(3, ("Could not pick valid domainname in gethostbyaddr "
1539 "operation"));
1540 return -1;
1541 default:
1542 DEBUGF(2, ("Could not pick valid domainname because of "
1543 "invalid opcode %d.", (int) op));
1544 return -1;
1545 }
1546 }
1547
1548 /*
1549 * Worker subprocesses with utilities
1550 */
1551 #ifdef WIN32
create_worker(Worker * pworker,int save_que)1552 static int create_worker(Worker *pworker, int save_que)
1553 {
1554 MesQ **thread_data = ALLOC(2*sizeof(MesQ *));
1555 DWORD tid;
1556
1557
1558 if (!create_mesq(thread_data)) {
1559 fatal("Could not create, pipes for subprocess, errno = %d",
1560 GetLastError());
1561 }
1562 if (!create_mesq(thread_data + 1)) {
1563 fatal("Could not create, pipes for subprocess, errno = %d",
1564 GetLastError());
1565 }
1566 /* Save those before the thread starts */
1567 pworker->writeto = thread_data[0];
1568 pworker->readfrom = thread_data[1];
1569
1570 if (((HANDLE) _beginthreadex(NULL, 0, worker_loop, thread_data, 0, &tid))
1571 == NULL) {
1572 fatal("Could not create thread errno = %d",
1573 GetLastError());
1574 }
1575 pworker->pid = tid;
1576 pworker->state = WORKER_FREE;
1577 pworker->serial = INVALID_SERIAL;
1578 if (!save_que) {
1579 pworker->que_first = pworker->que_last = NULL;
1580 pworker->que_size = 0;
1581 }
1582 DEBUGF(3,("Created worker[%ld]", (long) pworker->pid));
1583 return 0;
1584 }
1585
1586 #else
1587
create_worker(Worker * pworker,int save_que)1588 static int create_worker(Worker *pworker, int save_que)
1589 {
1590 int p0[2], p1[2];
1591 pid_t child;
1592
1593 if (pipe(p0)) {
1594 warning("Could not create, pipes for subprocess, errno = %d",
1595 errno);
1596 return -1;
1597 }
1598
1599 if (pipe(p1)) {
1600 warning("Could not create, pipes for subprocess, errno = %d",
1601 errno);
1602 close(p0[0]);
1603 close(p0[1]);
1604 return -1;
1605 }
1606 if ((child = fork()) < 0) { /* failure */
1607 warning("Could not fork(), errno = %d",
1608 errno);
1609 close(p0[0]);
1610 close(p0[1]);
1611 close(p1[0]);
1612 close(p1[1]);
1613 return -1;
1614 } else if (child > 0) { /* parent */
1615 close(p0[1]);
1616 close(p1[0]);
1617 pworker->writeto = p1[1];
1618 pworker->readfrom = p0[0];
1619 pworker->pid = child;
1620 pworker->state = WORKER_FREE;
1621 pworker->serial = INVALID_SERIAL;
1622 if (!save_que) {
1623 pworker->que_first = pworker->que_last = NULL;
1624 pworker->que_size = 0;
1625 }
1626 DEBUGF(3,("Created worker[%ld] with fd %d",
1627 (long) pworker->pid, (int) pworker->readfrom));
1628 return 0;
1629 } else { /* child */
1630 close(p1[1]);
1631 close(p0[0]);
1632 close_all_worker_fds();
1633 /* Make "fatal" not find any children */
1634 num_busy_workers = num_free_workers = num_stalled_workers = 0;
1635 if((dup2(p1[0],0) < 0) || (dup2(p0[1],1) < 0)) {
1636 fatal("Worker could not dup2(), errno = %d",
1637 errno);
1638 return -1; /* lint... */
1639 }
1640 close(p1[0]);
1641 close(p0[1]);
1642 signal(SIGCHLD, SIG_IGN);
1643 return worker_loop();
1644 }
1645 }
1646
close_all_worker_fds(void)1647 static void close_all_worker_fds(void)
1648 {
1649 int w,i;
1650 Worker *workers[3] = {free_workers, busy_workers, stalled_workers};
1651 int wsizes[3] = {num_free_workers, num_busy_workers,
1652 num_stalled_workers};
1653 for (w = 0; w < 3; ++w) {
1654 for (i = 0; i < wsizes[w]; ++i) {
1655 if (workers[w][i].state != WORKER_EMPTY) {
1656 close(workers[w][i].readfrom);
1657 close(workers[w][i].writeto);
1658 }
1659 }
1660 }
1661 }
1662
1663 #endif /* !WIN32 */
1664
1665 #ifdef WIN32
worker_loop(void * v)1666 DWORD WINAPI worker_loop(void *v)
1667 #else
1668 static int worker_loop(void)
1669 #endif
1670 {
1671 AddrByte *req = NULL;
1672 size_t req_size = 0;
1673 int this_size;
1674 AddrByte *reply = NULL;
1675 size_t reply_size = 0;
1676 size_t data_size;
1677
1678 #ifdef WIN32
1679 QueItem *m = NULL;
1680 MesQ *readfrom = ((MesQ **) v)[0];
1681 MesQ *writeto = ((MesQ **) v)[1];
1682 /* XXX:PaN */
1683 FREE(v);
1684 #endif
1685
1686 for(;;) {
1687 #ifdef HAVE_GETADDRINFO
1688 struct addrinfo *ai = NULL;
1689 #endif
1690 struct hostent *he = NULL;
1691 #ifdef HAVE_GETNAMEINFO
1692 struct sockaddr *sa = NULL;
1693 char name[NI_MAXHOST];
1694 #endif
1695 #if defined(HAVE_GETIPNODEBYNAME) || defined(HAVE_GETIPNODEBYADDR)
1696 int free_he = 0;
1697 #endif
1698 int error_num = 0;
1699 SerialType serial;
1700 OpType op;
1701 ProtoType proto;
1702 AddrByte *data;
1703
1704 #ifdef WIN32
1705 WaitForSingleObject(event_mesq(readfrom),INFINITE);
1706 DEBUGF(4,("Worker got data on message que."));
1707
1708 if(!deque_mesq(readfrom,&m)) {
1709 goto fail;
1710 }
1711 this_size = m->req_size;
1712 req = m->request;
1713 #else
1714 if (READ_PACKET_BYTES(0,&this_size) != PACKET_BYTES) {
1715 DEBUGF(2,("Worker got error/EOF while reading size, exiting."));
1716 exit(0);
1717 }
1718 if (this_size > req_size) {
1719 if (req == NULL) {
1720 req = ALLOC((req_size = this_size));
1721 } else {
1722 req = REALLOC(req, (req_size = this_size));
1723 }
1724 }
1725 if (read_exact(0, req, (size_t) this_size) != this_size) {
1726 DEBUGF(1,("Worker got EOF while reading data, exiting."));
1727 exit(0);
1728 }
1729 #endif
1730 /* Decode the request... */
1731 serial = get_serial(req);
1732 if (OP_CONTROL == (op = get_op(req))) {
1733 CtlType ctl;
1734 if (serial != INVALID_SERIAL) {
1735 DEBUGF(1, ("Worker got invalid serial: %d.", serial));
1736 exit(0);
1737 }
1738 switch (ctl = get_ctl(req)) {
1739 case SETOPT_DEBUG_LEVEL:
1740 debug_level = get_debug_level(req);
1741 DEBUGF(debug_level,
1742 ("Worker debug_level = %d.", debug_level));
1743 break;
1744 }
1745 continue;
1746 }
1747 proto = get_proto(req);
1748 data = get_data(req);
1749 DEBUGF(4,("Worker got request, op = %d, proto = %d, data = %s.",
1750 op,proto,data));
1751 /* Got a request, lets go... */
1752 switch (op) {
1753 case OP_GETHOSTBYNAME:
1754 switch (proto) {
1755
1756 #ifdef HAVE_IN6
1757 case PROTO_IPV6: { /* switch (proto) { */
1758 #ifdef HAVE_GETADDRINFO
1759 struct addrinfo hints;
1760
1761 memset(&hints, 0, sizeof(hints));
1762 hints.ai_flags = AI_CANONNAME;
1763 hints.ai_socktype = SOCK_STREAM;
1764 hints.ai_family = AF_INET6;
1765 DEBUGF(5, ("Starting getaddrinfo(%s, ...)", data));
1766 error_num = getaddrinfo((char *)data, NULL, &hints, &ai);
1767 DEBUGF(5,("getaddrinfo returned %d", error_num));
1768 if (error_num) {
1769 error_num = map_netdb_error_ai(error_num);
1770 }
1771 #elif defined(HAVE_GETIPNODEBYNAME) /*#ifdef HAVE_GETADDRINFO */
1772 DEBUGF(5,("Starting getipnodebyname(%s)",data));
1773 he = getipnodebyname(data, AF_INET6, 0, &error_num);
1774 if (he) {
1775 free_he = 1;
1776 error_num = 0;
1777 DEBUGF(5,("getipnodebyname(,AF_INET6,,) OK"));
1778 } else {
1779 DEBUGF(5,("getipnodebyname(,AF_INET6,,) error %d", error_num));
1780 error_num = map_netdb_error(error_num);
1781 }
1782 #elif defined(HAVE_GETHOSTBYNAME2) /*#ifdef HAVE_GETADDRINFO */
1783 DEBUGF(5,("Starting gethostbyname2(%s, AF_INET6)",data));
1784 he = gethostbyname2((char*)data, AF_INET6);
1785 if (he) {
1786 error_num = 0;
1787 DEBUGF(5,("gethostbyname2(, AF_INET6) OK"));
1788 } else {
1789 error_num = map_netdb_error(h_errno);
1790 DEBUGF(5,("gethostbyname2(, AF_INET6) error %d", h_errno));
1791 }
1792 #else
1793 error_num = ERRCODE_NOTSUP;
1794 #endif /*#ifdef HAVE_GETADDRINFO */
1795 } break;
1796 #endif /*ifdef HAVE_IN6 */
1797
1798 case PROTO_IPV4: { /* switch (proto) { */
1799 DEBUGF(5,("Starting gethostbyname(%s)",data));
1800 he = gethostbyname((char*)data);
1801 if (he) {
1802 error_num = 0;
1803 DEBUGF(5,("gethostbyname OK"));
1804 } else {
1805 error_num = map_netdb_error(h_errno);
1806 DEBUGF(5,("gethostbyname error %d", h_errno));
1807 }
1808 } break;
1809
1810 default: /* switch (proto) { */
1811 /* Not supported... */
1812 error_num = ERRCODE_NOTSUP;
1813 break;
1814 } /* switch (proto) { */
1815
1816 if (he) {
1817 data_size = build_reply(serial, he, &reply, &reply_size);
1818 #ifdef HAVE_GETIPNODEBYNAME
1819 if (free_he) {
1820 freehostent(he);
1821 }
1822 #endif
1823 #ifdef HAVE_GETADDRINFO
1824 } else if (ai) {
1825 data_size = build_reply_ai(serial, AF_INET6, 16, ai,
1826 &reply, &reply_size);
1827 freeaddrinfo(ai);
1828 #endif
1829 } else {
1830 data_size = build_error_reply(serial, error_num,
1831 &reply, &reply_size);
1832 }
1833 break; /* case OP_GETHOSTBYNAME: */
1834
1835 case OP_GETHOSTBYADDR: /* switch (op) { */
1836 switch (proto) {
1837 #ifdef HAVE_IN6
1838 case PROTO_IPV6: {
1839 #ifdef HAVE_GETNAMEINFO
1840 struct sockaddr_in6 *sin6;
1841 socklen_t salen = sizeof(*sin6);
1842
1843 sin6 = ALLOC(salen);
1844 #ifndef NO_SA_LEN
1845 sin6->sin6_len = salen;
1846 #endif
1847 sin6->sin6_family = AF_INET6;
1848 sin6->sin6_port = 0;
1849 memcpy(&sin6->sin6_addr, data, 16);
1850 sa = (struct sockaddr *)sin6;
1851 DEBUGF(5,("Starting getnameinfo(,,%s,16,,,)",
1852 format_address(16, data)));
1853 error_num = getnameinfo(sa, salen, name, sizeof(name),
1854 NULL, 0, NI_NAMEREQD);
1855 DEBUGF(5,("getnameinfo returned %d", error_num));
1856 if (error_num) {
1857 error_num = map_netdb_error_ai(error_num);
1858 sa = NULL;
1859 }
1860 #elif defined(HAVE_GETIPNODEBYADDR) /*#ifdef HAVE_GETNAMEINFO*/
1861 struct in6_addr ia;
1862 memcpy(ia.s6_addr, data, 16);
1863 DEBUGF(5,("Starting getipnodebyaddr(%s,16,AF_INET6,)",
1864 format_address(16, data)));
1865 he = getipnodebyaddr(&ia, 16, AF_INET6, &error_num);
1866 free_he = 1;
1867 if (! he) {
1868 DEBUGF(5,("getipnodebyaddr error %d", error_num));
1869 error_num = map_netdb_error(error_num);
1870 } else {
1871 DEBUGF(5,("getipnodebyaddr OK"));
1872 }
1873 #else /*#ifdef HAVE_GETNAMEINFO*/
1874 struct in6_addr ia;
1875 memcpy(ia.s6_addr, data, 16);
1876 DEBUGF(5,("Starting gethostbyaddr(%s,16,AF_INET6)",
1877 format_address(16, data)));
1878 he = gethostbyaddr((const char *) &ia, 16, AF_INET6);
1879 if (! he) {
1880 error_num = map_netdb_error(h_errno);
1881 DEBUGF(5,("gethostbyaddr error %d", h_errno));
1882 } else {
1883 DEBUGF(5,("gethostbyaddr OK"));
1884 }
1885 #endif /* #ifdef HAVE_GETNAMEINFO */
1886 } break; /* case PROTO_IPV6: { */
1887 #endif /* #ifdef HAVE_IN6 */
1888
1889 case PROTO_IPV4: { /* switch(proto) { */
1890 struct in_addr ia;
1891 memcpy(&ia.s_addr, data, 4); /* Alignment required... */
1892 DEBUGF(5,("Starting gethostbyaddr(%s,4,AF_INET)",
1893 format_address(4, data)));
1894 he = gethostbyaddr((const char *) &ia, 4, AF_INET);
1895 if (! he) {
1896 error_num = map_netdb_error(h_errno);
1897 DEBUGF(5,("gethostbyaddr error %d", h_errno));
1898 } else {
1899 DEBUGF(5,("gethostbyaddr OK"));
1900 }
1901 } break;
1902
1903 default:
1904 error_num = ERRCODE_NOTSUP;
1905 } /* switch(proto) { */
1906
1907 if (he) {
1908 data_size = build_reply(serial, he, &reply, &reply_size);
1909 #ifdef HAVE_GETIPNODEBYADDR
1910 if (free_he) {
1911 freehostent(he);
1912 }
1913 #endif
1914 #ifdef HAVE_GETNAMEINFO
1915 } else if (sa) {
1916 struct addrinfo res;
1917 memset(&res, 0, sizeof(res));
1918 res.ai_canonname = name;
1919 res.ai_addr = sa;
1920 res.ai_family = sa->sa_family;
1921 res.ai_next = NULL;
1922 data_size = build_reply_ai(serial, AF_INET6, 16, &res,
1923 &reply, &reply_size);
1924 free(sa);
1925 #endif
1926 } else {
1927 data_size = build_error_reply(serial, error_num,
1928 &reply, &reply_size);
1929 }
1930 break; /* case OP_GETHOSTBYADR: */
1931
1932 default:
1933 data_size = build_error_reply(serial, ERRCODE_NOTSUP,
1934 &reply, &reply_size);
1935 break;
1936 } /* switch (op) { */
1937
1938 #ifdef WIN32
1939 m = REALLOC(m, sizeof(QueItem) - 1 + data_size - PACKET_BYTES);
1940 m->next = NULL;
1941 m->req_size = data_size - PACKET_BYTES;
1942 memcpy(m->request,reply + PACKET_BYTES,data_size - PACKET_BYTES);
1943 if (!enque_mesq(writeto,m)) {
1944 goto fail;
1945 }
1946 m = NULL;
1947 #else
1948 /* expect no signals */
1949 if (write(1, reply, data_size) < 0)
1950 goto fail;
1951 #endif
1952 } /* for (;;) */
1953
1954 fail:
1955 #ifdef WIN32
1956 if (m != NULL) {
1957 FREE(m);
1958 }
1959 close_mesq(readfrom);
1960 close_mesq(writeto);
1961 if (reply) {
1962 FREE(reply);
1963 }
1964 #endif
1965 return 1;
1966 }
1967
map_netdb_error(int netdb_code)1968 static int map_netdb_error(int netdb_code)
1969 {
1970 switch (netdb_code) {
1971 #ifdef HOST_NOT_FOUND
1972 case HOST_NOT_FOUND:
1973 return ERRCODE_HOST_NOT_FOUND;
1974 #endif
1975 #ifdef TRY_AGAIN
1976 case TRY_AGAIN:
1977 return ERRCODE_TRY_AGAIN;
1978 #endif
1979 #ifdef NO_RECOVERY
1980 case NO_RECOVERY:
1981 return ERRCODE_NO_RECOVERY;
1982 #endif
1983 #if defined(NO_DATA) || defined(NO_ADDRESS)
1984 #ifdef NO_DATA
1985 case NO_DATA:
1986 #endif
1987 #ifdef NO_ADDRESS
1988 #if !defined(NO_DATA) || (NO_DATA != NO_ADDRESS)
1989 case NO_ADDRESS:
1990 #endif
1991 #endif
1992 return ERRCODE_NO_DATA;
1993 #endif
1994 default:
1995 return ERRCODE_NETDB_INTERNAL;
1996 }
1997 }
1998
1999 #if defined(HAVE_GETADDRINFO) || defined(HAVE_GETNAMEINFO)
map_netdb_error_ai(int netdb_code)2000 static int map_netdb_error_ai(int netdb_code)
2001 {
2002 switch(netdb_code) {
2003 #ifdef EAI_ADDRFAMILY
2004 case EAI_ADDRFAMILY:
2005 return ERRCODE_NETDB_INTERNAL;
2006 #endif
2007 case EAI_AGAIN:
2008 return ERRCODE_TRY_AGAIN;
2009 case EAI_BADFLAGS:
2010 return ERRCODE_NETDB_INTERNAL;
2011 case EAI_FAIL:
2012 return ERRCODE_HOST_NOT_FOUND;
2013 case EAI_FAMILY:
2014 return ERRCODE_NETDB_INTERNAL;
2015 case EAI_MEMORY:
2016 return ERRCODE_NETDB_INTERNAL;
2017 #if defined(EAI_NODATA) && EAI_NODATA != EAI_NONAME
2018 case EAI_NODATA:
2019 return ERRCODE_HOST_NOT_FOUND;
2020 #endif
2021 case EAI_NONAME:
2022 return ERRCODE_HOST_NOT_FOUND;
2023 case EAI_SERVICE:
2024 return ERRCODE_NETDB_INTERNAL;
2025 case EAI_SOCKTYPE:
2026 return ERRCODE_NETDB_INTERNAL;
2027 default:
2028 return ERRCODE_NETDB_INTERNAL;
2029 }
2030 }
2031 #endif /* #if defined(HAVE_GETADDRINFO) || defined(HAVE_GETNAMEINFO) */
2032
2033
errcode_to_string(int errcode)2034 static char *errcode_to_string(int errcode)
2035 {
2036 switch (errcode) {
2037 case ERRCODE_NOTSUP:
2038 return "enotsup";
2039 case ERRCODE_HOST_NOT_FOUND:
2040 /*
2041 * I would preffer
2042 * return "host_not_found";
2043 * but have to keep compatibility with the old
2044 * inet_gethost's error codes...
2045 */
2046 return "notfound";
2047 case ERRCODE_TRY_AGAIN:
2048 return "try_again";
2049 case ERRCODE_NO_RECOVERY:
2050 return "no_recovery";
2051 case ERRCODE_NO_DATA:
2052 return "no_data";
2053 default:
2054 /*case ERRCODE_NETDB_INTERNAL:*/
2055 return "netdb_internal";
2056 }
2057 }
2058
build_error_reply(SerialType serial,int errnum,AddrByte ** preply,size_t * preply_size)2059 static size_t build_error_reply(SerialType serial, int errnum,
2060 AddrByte **preply,
2061 size_t *preply_size)
2062 {
2063 char *errstring = errcode_to_string(errnum);
2064 int string_need = strlen(errstring) + 1; /* a '\0' too */
2065 unsigned need;
2066 AddrByte *ptr;
2067
2068 need = PACKET_BYTES + 4 /* Serial */ + 1 /* Unit */ + string_need;
2069 if (*preply_size < need) {
2070 if (*preply_size == 0) {
2071 *preply = ALLOC((*preply_size = need));
2072 } else {
2073 *preply = REALLOC(*preply,
2074 (*preply_size = need));
2075 }
2076 }
2077 ptr = *preply;
2078 PUT_PACKET_BYTES(ptr,need - PACKET_BYTES);
2079 ptr += PACKET_BYTES;
2080 put_int32(ptr,serial);
2081 ptr +=4;
2082 *ptr++ = (AddrByte) 0; /* 4 or 16 */
2083 strcpy((char*)ptr, errstring);
2084 return need;
2085 }
2086
2087
2088
build_reply(SerialType serial,struct hostent * he,AddrByte ** preply,size_t * preply_size)2089 static size_t build_reply(SerialType serial, struct hostent *he,
2090 AddrByte **preply, size_t *preply_size)
2091 {
2092 unsigned need;
2093 int strings_need;
2094 int num_strings;
2095 int num_addresses;
2096 int i;
2097 AddrByte *ptr;
2098 int unit = he->h_length;
2099
2100 for (num_addresses = 0; he->h_addr_list[num_addresses] != NULL;
2101 ++num_addresses)
2102 ;
2103 strings_need = strlen(he->h_name) + 1; /* 1 for null byte */
2104 num_strings = 1;
2105 if (he->h_aliases) {
2106 for(i=0; he->h_aliases[i] != NULL; ++i) {
2107 strings_need += strlen(he->h_aliases[i]) + 1;
2108 ++num_strings;
2109 }
2110 }
2111
2112 need = PACKET_BYTES +
2113 4 /* Serial */ + 1 /* Unit */ + 4 /* Naddr */ +
2114 (unit * num_addresses) /* Address bytes */ +
2115 4 /* Nnames */ + strings_need /* The name and alias strings */;
2116
2117 if (*preply_size < need) {
2118 if (*preply_size == 0) {
2119 *preply = ALLOC((*preply_size = need));
2120 } else {
2121 *preply = REALLOC(*preply,
2122 (*preply_size = need));
2123 }
2124 }
2125 ptr = *preply;
2126 PUT_PACKET_BYTES(ptr,need - PACKET_BYTES);
2127 ptr += PACKET_BYTES;
2128 put_int32(ptr,serial);
2129 ptr +=4;
2130 *ptr++ = (AddrByte) unit; /* 4 or 16 */
2131 put_int32(ptr, num_addresses);
2132 ptr += 4;
2133 for (i = 0; i < num_addresses; ++i) {
2134 memcpy(ptr, he->h_addr_list[i], unit);
2135 ptr += unit;
2136 }
2137 put_int32(ptr, num_strings);
2138 ptr += 4;
2139 strcpy((char*)ptr, he->h_name);
2140 ptr += 1 + strlen(he->h_name);
2141 for (i = 0; i < (num_strings - 1); ++i) {
2142 strcpy((char*)ptr, he->h_aliases[i]);
2143 ptr += 1 + strlen(he->h_aliases[i]);
2144 }
2145 return need;
2146 }
2147
2148 #if defined(HAVE_GETADDRINFO) || defined(HAVE_GETNAMEINFO)
build_reply_ai(SerialType serial,int family,int addrlen,struct addrinfo * res0,AddrByte ** preply,size_t * preply_size)2149 static size_t build_reply_ai(SerialType serial,
2150 int family, int addrlen,
2151 struct addrinfo *res0,
2152 AddrByte **preply, size_t *preply_size)
2153 {
2154 struct addrinfo *res;
2155 int num_strings;
2156 int num_addresses;
2157 AddrByte *ptr;
2158 int need;
2159
2160 num_addresses = 0;
2161 num_strings = 0;
2162 need = PACKET_BYTES +
2163 4 /* Serial */ + 1 /* addrlen */ +
2164 4 /* Naddr */ + 4 /* Nnames */;
2165
2166 for (res = res0; res != NULL; res = res->ai_next) {
2167 if ((res->ai_addr) &&
2168 (res->ai_addr->sa_family == family)) {
2169 num_addresses++;
2170 need += addrlen;
2171 }
2172 if ((res->ai_canonname) &&
2173 (res->ai_family == family)) {
2174 num_strings++;
2175 need += strlen(res->ai_canonname) + 1;
2176 }
2177 }
2178
2179 if (*preply_size < need) {
2180 if (*preply_size == 0) {
2181 *preply = ALLOC((*preply_size = need));
2182 } else {
2183 *preply = REALLOC(*preply,
2184 (*preply_size = need));
2185 }
2186 }
2187
2188 ptr = *preply;
2189 PUT_PACKET_BYTES(ptr,need - PACKET_BYTES);
2190 ptr += PACKET_BYTES;
2191 put_int32(ptr,serial);
2192 ptr +=4;
2193 *ptr++ = (AddrByte) addrlen; /* 4 or 16 */
2194 put_int32(ptr, num_addresses);
2195 ptr += 4;
2196 for (res = res0; res != NULL; res = res->ai_next) {
2197 if ((res->ai_addr) &&
2198 (res->ai_addr->sa_family == family)) {
2199 const void *src;
2200 switch (family) {
2201 case AF_INET:
2202 src = &((struct sockaddr_in *)res->ai_addr)->sin_addr;
2203 break;
2204 #ifdef AF_INET6
2205 case AF_INET6:
2206 src = &((struct sockaddr_in6 *)res->ai_addr)->sin6_addr;
2207 break;
2208 #endif
2209 default:
2210 src = res->ai_addr->sa_data;
2211 }
2212 memcpy(ptr, src, addrlen);
2213 ptr += addrlen;
2214 }
2215 }
2216 put_int32(ptr, num_strings);
2217 ptr += 4;
2218 for (res = res0; res != NULL; res = res->ai_next) {
2219 if ((res->ai_canonname) &&
2220 (res->ai_family == family)) {
2221 strcpy((char *)ptr, res->ai_canonname);
2222 ptr += strlen(res->ai_canonname) + 1;
2223 }
2224 }
2225 return need;
2226 }
2227
2228 #endif /* #if defined(HAVE_GETADDRINFO) || defined(HAVE_GETNAMEINFO) */
2229
2230
2231
2232 /*
2233 * Encode/decode/read/write
2234 */
2235
get_int32(AddrByte * b)2236 static int get_int32(AddrByte *b)
2237 {
2238 int res;
2239 res = (unsigned) b[3];
2240 res |= ((unsigned) b[2]) << 8;
2241 res |= ((unsigned) b[1]) << 16;
2242 res |= ((unsigned) b[0]) << 24;
2243 return res;
2244 }
2245
put_int32(AddrByte * buff,int value)2246 static void put_int32(AddrByte *buff, int value)
2247 {
2248 buff[0] = (((unsigned) value) >> 24) & 0xFF;
2249 buff[1] = (((unsigned) value) >> 16) & 0xFF;
2250 buff[2] = (((unsigned) value) >> 8) & 0xFF;
2251 buff[3] = ((unsigned) value) & 0xFF;
2252 }
2253 #ifdef WIN32
2254
read_int32(HANDLE fd,int * res,HANDLE ev)2255 static int read_int32(HANDLE fd, int *res, HANDLE ev)
2256 {
2257 AddrByte b[4];
2258 int r;
2259 if ((r = read_exact(fd,b,4,ev)) < 0) {
2260 return -1;
2261 } else if (r == 0) {
2262 return 0;
2263 } else {
2264 *res = (unsigned) b[3];
2265 *res |= ((unsigned) b[2]) << 8;
2266 *res |= ((unsigned) b[1]) << 16;
2267 *res |= ((unsigned) b[0]) << 24;
2268 }
2269 return 4;
2270 }
2271 /*
2272 * The standard input is expected to be opened with FILE_FLAG_OVERLAPPED
2273 * but this code should handle both cases (although winsock might not).
2274 */
read_exact(HANDLE fd,void * vbuff,DWORD nbytes,HANDLE ev)2275 static int read_exact(HANDLE fd, void *vbuff, DWORD nbytes, HANDLE ev)
2276 {
2277 DWORD ret,got;
2278 BOOL stat;
2279 char *buff = vbuff;
2280 OVERLAPPED ov;
2281 DWORD err;
2282
2283
2284 got = 0;
2285 for(;;) {
2286 memset(&ov,0,sizeof(ov));
2287 ov.hEvent = ev;
2288 ResetEvent(ov.hEvent);
2289 stat = ReadFile(fd, buff, nbytes - got, &ret, &ov);
2290 if (!stat) {
2291 if ((err = GetLastError()) == ERROR_IO_PENDING) {
2292 DEBUGF(4,("Overlapped read, waiting for completion..."));
2293 WaitForSingleObject(ov.hEvent,INFINITE);
2294 stat = GetOverlappedResult(fd,&ov,&ret,TRUE);
2295 DEBUGF(4,("Overlapped read, completed with status %d,"
2296 " result %d",stat,ret));
2297 }
2298 if (!stat) {
2299 if (GetLastError() == ERROR_BROKEN_PIPE) {
2300 DEBUGF(1, ("End of file while reading from pipe."));
2301 return 0;
2302 } else {
2303 DEBUGF(1, ("Error while reading from pipe,"
2304 " errno = %d",
2305 GetLastError()));
2306 return -1;
2307 }
2308 }
2309 } else {
2310 DEBUGF(4,("Read completed syncronously, result %d",ret));
2311 }
2312 if (ret == 0) {
2313 DEBUGF(1, ("End of file detected as zero read from pipe."));
2314 return 0;
2315 }
2316 if (ret < nbytes - got) {
2317 DEBUGF(4,("Not all data read from pipe, still %d bytes to read.",
2318 nbytes - (got + ret)));
2319 got += ret;
2320 buff += ret;
2321 } else {
2322 return nbytes;
2323 }
2324 }
2325 }
2326 /*
2327 * Now, we actually expect a HANDLE opened with FILE_FLAG_OVERLAPPED,
2328 * but this code should handle both cases (although winsock
2329 * does not always..)
2330 */
write_exact(HANDLE fd,AddrByte * buff,DWORD len,HANDLE ev)2331 static int write_exact(HANDLE fd, AddrByte *buff, DWORD len, HANDLE ev)
2332 {
2333 DWORD res,stat;
2334 DWORD x = len;
2335 OVERLAPPED ov;
2336 DWORD err;
2337
2338
2339 for(;;) {
2340 memset(&ov,0,sizeof(ov));
2341 ov.hEvent = ev;
2342 ResetEvent(ov.hEvent);
2343 stat = WriteFile(fd,buff,x,&res,&ov);
2344 if (!stat) {
2345 if ((err = GetLastError()) == ERROR_IO_PENDING) {
2346 DEBUGF(4,("Overlapped write, waiting for competion..."));
2347 WaitForSingleObject(ov.hEvent,INFINITE);
2348 stat = GetOverlappedResult(fd,&ov,&res,TRUE);
2349 DEBUGF(4,("Overlapped write, completed with status %d,"
2350 " result %d",stat,res));
2351 }
2352 if (!stat) {
2353 if (GetLastError() == ERROR_BROKEN_PIPE) {
2354 return 0;
2355 } else {
2356 return -1;
2357 }
2358 }
2359 } else {
2360 DEBUGF(4,("Write completed syncronously, result %d",res));
2361 }
2362
2363 if (res < x) {
2364 /* Microsoft states this can happen as HANDLE is a pipe... */
2365 DEBUGF(4,("Not all data written to pipe, still %d bytes to write.",
2366 x - res));
2367 x -= res;
2368 buff += res;
2369 } else {
2370 return len;
2371 }
2372 }
2373 }
2374
reader(void * data)2375 DWORD WINAPI reader(void *data) {
2376 MesQ *mq = (MesQ *) data;
2377 QueItem *m;
2378 int siz;
2379 int r;
2380 HANDLE inp;
2381 int x = 0;
2382 HANDLE ev = CreateEvent(NULL, TRUE, FALSE, NULL);
2383
2384 inp = GetStdHandle(STD_INPUT_HANDLE);
2385 for (;;) {
2386 if ((r = READ_PACKET_BYTES(inp,&siz,ev)) != 4) {
2387 DEBUGF(1,("Erlang has closed (reading)"));
2388 exit(0);
2389 }
2390 DEBUGF(4,("Read packet of size %d from erlang",siz));
2391 m = ALLOC(sizeof(QueItem) - 1 + siz);
2392 if (read_exact(inp, m->request, siz,ev) != siz) {
2393 fatal("Unexpected end of file on main input, errno = %d",errno);
2394 }
2395 if (siz < 5) {
2396 fatal("Unexpected message on main input, message size %d less "
2397 "than minimum.");
2398 }
2399 m->req_size = siz;
2400 m->next = NULL;
2401 if (!enque_mesq(mq, m)) {
2402 fatal("Reader could not talk to main thread!");
2403 }
2404 }
2405 }
2406
writer(void * data)2407 DWORD WINAPI writer(void *data)
2408 {
2409 MesQ *mq = (MesQ *) data;
2410 QueItem *m;
2411 HANDLE outp = GetStdHandle(STD_OUTPUT_HANDLE);
2412 AddrByte hdr[PACKET_BYTES];
2413 HANDLE ev = CreateEvent(NULL, TRUE, FALSE, NULL);
2414
2415
2416 for (;;) {
2417 WaitForSingleObject(event_mesq(mq),INFINITE);
2418 if (!deque_mesq(mq, &m)) {
2419 fatal("Writer could not talk to main thread!");
2420 }
2421 PUT_PACKET_BYTES(hdr, m->req_size);
2422 if (write_exact(outp, hdr, 4, ev) != 4) {
2423 DEBUGF(1,("Erlang has closed (writing)"));
2424 exit(0);
2425 }
2426 if (write_exact(outp, m->request, m->req_size, ev) != m->req_size) {
2427 DEBUGF(1,("Erlang has closed (writing)"));
2428 exit(0);
2429 }
2430 FREE(m);
2431 }
2432 }
2433
2434
2435 #else
2436
read_int32(int fd,int * res)2437 static size_t read_int32(int fd, int *res)
2438 {
2439 AddrByte b[4];
2440 int r;
2441 if ((r = read_exact(fd,b,4)) < 0) {
2442 return -1;
2443 } else if (r == 0) {
2444 return 0;
2445 } else {
2446 *res = (unsigned) b[3];
2447 *res |= ((unsigned) b[2]) << 8;
2448 *res |= ((unsigned) b[1]) << 16;
2449 *res |= ((unsigned) b[0]) << 24;
2450 }
2451 return 4;
2452 }
2453
read_exact(int fd,void * vbuff,size_t nbytes)2454 static ssize_t read_exact(int fd, void *vbuff, size_t nbytes)
2455 {
2456 ssize_t ret, got;
2457 char *buff = vbuff;
2458
2459 got = 0;
2460 for(;;) {
2461 ret = read(fd, buff, nbytes - got);
2462 if (ret < 0) {
2463 if (errno == EINTR) {
2464 continue;
2465 } else {
2466 DEBUGF(1, ("Error while reading from pipe,"
2467 " errno = %d",
2468 errno));
2469 return -1;
2470 }
2471 } else if (ret == 0) {
2472 DEBUGF(1, ("End of file while reading from pipe."));
2473 if (got == 0) {
2474 return 0; /* "Normal" EOF */
2475 } else {
2476 return -1;
2477 }
2478 } else if (ret < nbytes - got) {
2479 got += ret;
2480 buff += ret;
2481 } else {
2482 return nbytes;
2483 }
2484 }
2485 }
2486
write_exact(int fd,AddrByte * buff,int len)2487 static int write_exact(int fd, AddrByte *buff, int len)
2488 {
2489 int res;
2490 int x = len;
2491 for(;;) {
2492 if((res = write(fd, buff, x)) == x) {
2493 break;
2494 }
2495 if (res < 0) {
2496 if (errno == EINTR) {
2497 continue;
2498 } else if (errno == EPIPE) {
2499 return 0;
2500 }
2501 #ifdef ENXIO
2502 else if (errno == ENXIO) {
2503 return 0;
2504 }
2505 #endif
2506 else {
2507 return -1;
2508 }
2509 } else {
2510 /* Hmmm, blocking write but not all written, could this happen
2511 if the other end was closed during the operation? Well,
2512 it costs very little to handle anyway... */
2513 x -= res;
2514 buff += res;
2515 }
2516 }
2517 return len;
2518 }
2519
2520 #endif /* !WIN32 */
2521
2522 /*
2523 * Debug and memory allocation
2524 */
2525
format_address(int siz,AddrByte * addr)2526 static char *format_address(int siz, AddrByte *addr)
2527 {
2528 static char buff[50];
2529 char tmp[10];
2530 if (siz > 16) {
2531 return "(unknown)";
2532 }
2533 *buff='\0';
2534 if (siz <= 4) {
2535 while(siz--) {
2536 erts_snprintf(tmp, sizeof(tmp), "%d",(int) *addr++);
2537 strcat(buff,tmp);
2538 if(siz) {
2539 strcat(buff,".");
2540 }
2541 }
2542 return buff;
2543 }
2544 while(siz--) {
2545 erts_snprintf(tmp, sizeof(tmp), "%02x",(int) *addr++);
2546 strcat(buff,tmp);
2547 if(siz) {
2548 strcat(buff,":");
2549 }
2550 }
2551 return buff;
2552 }
2553
debugf(char * format,...)2554 static void debugf(char *format, ...)
2555 {
2556 char buff[2048];
2557 char *ptr;
2558 va_list ap;
2559
2560 va_start(ap,format);
2561 #ifdef WIN32
2562 erts_snprintf(buff, sizeof(buff), "%s[%d] (DEBUG):",program_name,(int) GetCurrentThreadId());
2563 #else
2564 erts_snprintf(buff, sizeof(buff), "%s[%d] (DEBUG):",program_name,(int) getpid());
2565 #endif
2566 ptr = buff + strlen(buff);
2567 erts_vsnprintf(ptr,sizeof(buff)-strlen(buff)-2,format,ap);
2568 strcat(ptr,"\r\n");
2569 #ifdef WIN32
2570 if (debug_console_allocated != INVALID_HANDLE_VALUE) {
2571 DWORD res;
2572 WriteFile(debug_console_allocated,buff,strlen(buff),&res,NULL);
2573 }
2574 #else
2575 /* suppress warning with 'if' */
2576 if(write(2,buff,strlen(buff)))
2577 ;
2578 #endif
2579 va_end(ap);
2580 }
2581
warning(char * format,...)2582 static void warning(char *format, ...)
2583 {
2584 char buff[2048];
2585 char *ptr;
2586 va_list ap;
2587
2588 va_start(ap,format);
2589 erts_snprintf(buff, sizeof(buff), "%s[%d]: WARNING:",program_name, (int) getpid());
2590 ptr = buff + strlen(buff);
2591 erts_vsnprintf(ptr,sizeof(buff)-strlen(buff)-2,format,ap);
2592 strcat(ptr,"\r\n");
2593 #ifdef WIN32
2594 {
2595 DWORD res;
2596 WriteFile(GetStdHandle(STD_ERROR_HANDLE),buff,strlen(buff),&res,NULL);
2597 }
2598 #else
2599 /* suppress warning with 'if' */
2600 if(write(2,buff,strlen(buff)))
2601 ;
2602 #endif
2603 va_end(ap);
2604 }
2605
fatal(char * format,...)2606 static void fatal(char *format, ...)
2607 {
2608 char buff[2048];
2609 char *ptr;
2610 va_list ap;
2611
2612 va_start(ap,format);
2613 erts_snprintf(buff, sizeof(buff), "%s[%d]: FATAL ERROR:",program_name, (int) getpid());
2614 ptr = buff + strlen(buff);
2615 erts_vsnprintf(ptr,sizeof(buff)-strlen(buff)-2,format,ap);
2616 strcat(ptr,"\r\n");
2617 #ifdef WIN32
2618 {
2619 DWORD res;
2620 WriteFile(GetStdHandle(STD_ERROR_HANDLE),buff,strlen(buff),&res,NULL);
2621 }
2622 #else
2623 /* suppress warning with 'if' */
2624 if(write(2,buff,strlen(buff)))
2625 ;
2626 #endif
2627 va_end(ap);
2628 #ifndef WIN32
2629 kill_all_workers();
2630 #endif
2631 exit(1);
2632 }
2633
my_malloc(size_t size)2634 static void *my_malloc(size_t size)
2635 {
2636 void *ptr = malloc(size);
2637 if (!ptr) {
2638 fatal("Cannot allocate %u bytes of memory.", (unsigned) size);
2639 return NULL; /* lint... */
2640 }
2641 return ptr;
2642 }
2643
my_realloc(void * old,size_t size)2644 static void *my_realloc(void *old, size_t size)
2645 {
2646 void *ptr = realloc(old, size);
2647 if (!ptr) {
2648 fatal("Cannot reallocate %u bytes of memory from %p.",
2649 (unsigned) size, old);
2650 return NULL; /* lint... */
2651 }
2652 return ptr;
2653 }
2654
2655 #ifdef WIN32
2656
create_mesq(MesQ ** q)2657 BOOL create_mesq(MesQ **q)
2658 {
2659 MesQ *tmp = ALLOC(sizeof(MesQ));
2660 tmp->data_present = CreateEvent(NULL, TRUE, FALSE,NULL);
2661 if (tmp->data_present == NULL) {
2662 free(tmp);
2663 return FALSE;
2664 }
2665 InitializeCriticalSection(&(tmp->crit)); /* Cannot fail */
2666 tmp->shutdown = 0;
2667 tmp->first = NULL;
2668 tmp->last = NULL;
2669 *q = tmp;
2670 return TRUE;
2671 }
2672
enque_mesq(MesQ * q,QueItem * m)2673 BOOL enque_mesq(MesQ *q, QueItem *m)
2674 {
2675 EnterCriticalSection(&(q->crit));
2676 if (q->shutdown) {
2677 LeaveCriticalSection(&(q->crit));
2678 return FALSE;
2679 }
2680 if (q->last == NULL) {
2681 q->first = q->last = m;
2682 } else {
2683 q->last->next = m;
2684 q->last = m;
2685 }
2686 m->next = NULL;
2687 if (!SetEvent(q->data_present)) {
2688 fprintf(stderr,"Fatal: Unable to signal event in %s:%d, last error: %d\n",
2689 __FILE__,__LINE__,GetLastError());
2690 exit(1); /* Unable to continue at all */
2691 }
2692 LeaveCriticalSection(&(q->crit));
2693 return TRUE;
2694 }
2695
deque_mesq(MesQ * q,QueItem ** m)2696 BOOL deque_mesq(MesQ *q, QueItem **m)
2697 {
2698 EnterCriticalSection(&(q->crit));
2699 if (q->first == NULL) { /* Usually shutdown from other end */
2700 ResetEvent(q->data_present);
2701 LeaveCriticalSection(&(q->crit));
2702 return FALSE;
2703 }
2704 *m = q->first;
2705 q->first = q->first->next;
2706 if (q->first == NULL) {
2707 q->last = NULL;
2708 ResetEvent(q->data_present);
2709 }
2710 (*m)->next = NULL;
2711 LeaveCriticalSection(&(q->crit));
2712 return TRUE;
2713 }
2714
close_mesq(MesQ * q)2715 BOOL close_mesq(MesQ *q)
2716 {
2717 QueItem *tmp;
2718 EnterCriticalSection(&(q->crit));
2719 if (!q->shutdown) {
2720 q->shutdown = TRUE;
2721 if (!SetEvent(q->data_present)) {
2722 fprintf(stderr,
2723 "Fatal: Unable to signal event in %s:%d, last error: %d\n",
2724 __FILE__,__LINE__,GetLastError());
2725 exit(1); /* Unable to continue at all */
2726 }
2727 LeaveCriticalSection(&(q->crit));
2728 return FALSE;
2729 }
2730 /* No one else is supposed to use this object any more */
2731 LeaveCriticalSection(&(q->crit));
2732 DeleteCriticalSection(&(q->crit));
2733 CloseHandle(q->data_present);
2734 tmp = q->first;
2735 while(tmp) {
2736 q->first = q->first->next;
2737 free(tmp);
2738 tmp = q->first;
2739 }
2740 free(q);
2741 return TRUE;
2742 }
2743
event_mesq(MesQ * q)2744 HANDLE event_mesq(MesQ *q)
2745 {
2746 return q->data_present;
2747 }
2748
2749 #ifdef HARDDEBUG
pseudo_worker_loop(void * v)2750 DWORD WINAPI pseudo_worker_loop(void *v)
2751 {
2752 HOSTENT *hep;
2753
2754 DEBUGF(1,("gethostbyname(\"ftp.funet.fi\") starting"));
2755 hep = gethostbyname("ftp.funet.fi");
2756
2757 DEBUGF(1,("gethostbyname(\"ftp.funet.fi\") -> %d OK",(int) hep));
2758 return 0;
2759 }
2760
poll_gethost(int row)2761 static void poll_gethost(int row) {
2762 HANDLE h;
2763 DWORD tid;
2764 h = (HANDLE) _beginthreadex(NULL, 0, pseudo_worker_loop, NULL, 0, &tid);
2765 if (h == NULL) {
2766 DEBUGF(1,("Failed to spawn pseudo worker (%d)...",row));
2767 } else {
2768 DEBUGF(1,("Waiting for pseudo worker (%d)", row));
2769 WaitForSingleObject(h,INFINITE);
2770 DEBUGF(1,("Done Waiting for pseudo worker (%d)", row));
2771 }
2772 }
2773 #endif
2774
2775 #endif /* WIN32 */
2776