1 /** @file
2  *  @brief RemoteConnection class used by the remote backend.
3  */
4 /* Copyright (C) 2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2017 Olly Betts
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19  */
20 
21 #include <config.h>
22 
23 #include "remoteconnection.h"
24 
25 #include <xapian/error.h>
26 
27 #include "safefcntl.h"
28 #include "safeunistd.h"
29 
30 #ifdef HAVE_POLL_H
31 # include <poll.h>
32 #else
33 # include "safesysselect.h"
34 #endif
35 
36 #include <algorithm>
37 #include <cerrno>
38 #include <climits>
39 #include <cstdint>
40 #include <string>
41 #ifdef __WIN32__
42 # include <type_traits>
43 #endif
44 
45 #include "debuglog.h"
46 #include "fd.h"
47 #include "filetests.h"
48 #include "noreturn.h"
49 #include "omassert.h"
50 #include "overflow.h"
51 #include "posixy_wrapper.h"
52 #include "realtime.h"
53 #include "length.h"
54 #include "socket_utils.h"
55 
56 using namespace std;
57 
58 #define CHUNKSIZE 4096
59 
60 XAPIAN_NORETURN(static void throw_database_closed());
61 static void
throw_database_closed()62 throw_database_closed()
63 {
64     throw Xapian::DatabaseClosedError("Database has been closed");
65 }
66 
67 XAPIAN_NORETURN(static void throw_network_error_insane_message_length());
68 static void
throw_network_error_insane_message_length()69 throw_network_error_insane_message_length()
70 {
71     throw Xapian::NetworkError("Insane message length specified!");
72 }
73 
74 XAPIAN_NORETURN(static void throw_timeout(const char*, const string&));
75 static void
throw_timeout(const char * msg,const string & context)76 throw_timeout(const char* msg, const string& context)
77 {
78     throw Xapian::NetworkTimeoutError(msg, context);
79 }
80 
81 #ifdef __WIN32__
82 static inline void
update_overlapped_offset(WSAOVERLAPPED & overlapped,DWORD n)83 update_overlapped_offset(WSAOVERLAPPED & overlapped, DWORD n)
84 {
85     if (add_overflows(overlapped.Offset, n, overlapped.Offset))
86 	++overlapped.OffsetHigh;
87 }
88 #endif
89 
RemoteConnection(int fdin_,int fdout_,const string & context_)90 RemoteConnection::RemoteConnection(int fdin_, int fdout_,
91 				   const string & context_)
92     : fdin(fdin_), fdout(fdout_), context(context_)
93 {
94 #ifdef __WIN32__
95     memset(&overlapped, 0, sizeof(overlapped));
96     overlapped.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
97     if (!overlapped.hEvent)
98 	throw Xapian::NetworkError("Failed to setup OVERLAPPED",
99 				   context, -int(GetLastError()));
100 
101 #endif
102 }
103 
104 #ifdef __WIN32__
~RemoteConnection()105 RemoteConnection::~RemoteConnection()
106 {
107     if (overlapped.hEvent)
108 	CloseHandle(overlapped.hEvent);
109 }
110 #endif
111 
112 bool
read_at_least(size_t min_len,double end_time)113 RemoteConnection::read_at_least(size_t min_len, double end_time)
114 {
115     LOGCALL(REMOTE, bool, "RemoteConnection::read_at_least", min_len | end_time);
116 
117     if (buffer.length() >= min_len) RETURN(true);
118 
119 #ifdef __WIN32__
120     HANDLE hin = fd_to_handle(fdin);
121     do {
122 	char buf[CHUNKSIZE];
123 	DWORD received;
124 	BOOL ok = ReadFile(hin, buf, sizeof(buf), &received, &overlapped);
125 	if (!ok) {
126 	    int errcode = GetLastError();
127 	    if (errcode != ERROR_IO_PENDING)
128 		throw Xapian::NetworkError("read failed", context, -errcode);
129 	    // Is asynch - just wait for the data to be received or a timeout.
130 	    DWORD waitrc;
131 	    waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
132 	    if (waitrc != WAIT_OBJECT_0) {
133 		LOGLINE(REMOTE, "read: timeout has expired");
134 		throw_timeout("Timeout expired while trying to read", context);
135 	    }
136 	    // Get the final result of the read.
137 	    if (!GetOverlappedResult(hin, &overlapped, &received, FALSE))
138 		throw Xapian::NetworkError("Failed to get overlapped result",
139 					   context, -int(GetLastError()));
140 	}
141 
142 	if (received == 0) {
143 	    RETURN(false);
144 	}
145 
146 	buffer.append(buf, received);
147 
148 	// We must update the offset in the OVERLAPPED structure manually.
149 	update_overlapped_offset(overlapped, received);
150     } while (buffer.length() < min_len);
151 #else
152     // If there's no end_time, just use blocking I/O.
153     if (fcntl(fdin, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
154 	throw Xapian::NetworkError("Failed to set fdin non-blocking-ness",
155 				   context, errno);
156     }
157 
158     while (true) {
159 	char buf[CHUNKSIZE];
160 	ssize_t received = read(fdin, buf, sizeof(buf));
161 
162 	if (received > 0) {
163 	    buffer.append(buf, received);
164 	    if (buffer.length() >= min_len) RETURN(true);
165 	    continue;
166 	}
167 
168 	if (received == 0) {
169 	    RETURN(false);
170 	}
171 
172 	LOGLINE(REMOTE, "read gave errno = " << errno);
173 	if (errno == EINTR) continue;
174 
175 	if (errno != EAGAIN)
176 	    throw Xapian::NetworkError("read failed", context, errno);
177 
178 	Assert(end_time != 0.0);
179 	while (true) {
180 	    // Calculate how far in the future end_time is.
181 	    double now = RealTime::now();
182 	    double time_diff = end_time - now;
183 	    // Check if the timeout has expired.
184 	    if (time_diff < 0) {
185 		LOGLINE(REMOTE, "read: timeout has expired");
186 		throw_timeout("Timeout expired while trying to read", context);
187 	    }
188 
189 	    // Wait until there is data, an error, or the timeout is reached.
190 # ifdef HAVE_POLL
191 	    struct pollfd fds;
192 	    fds.fd = fdin;
193 	    fds.events = POLLIN;
194 	    int poll_result = poll(&fds, 1, int(time_diff * 1000));
195 	    if (poll_result > 0) break;
196 
197 	    if (poll_result == 0)
198 		throw_timeout("Timeout expired while trying to read", context);
199 
200 	    // EINTR means poll was interrupted by a signal.  EAGAIN means that
201 	    // allocation of internal data structures failed.
202 	    if (errno != EINTR && errno != EAGAIN)
203 		throw Xapian::NetworkError("poll failed during read",
204 					   context, errno);
205 # else
206 	    if (fdin >= FD_SETSIZE) {
207 		// We can't block with a timeout, so just sleep and retry.
208 		RealTime::sleep(now + min(0.001, time_diff / 4));
209 		break;
210 	    }
211 	    fd_set fdset;
212 	    FD_ZERO(&fdset);
213 	    FD_SET(fdin, &fdset);
214 
215 	    struct timeval tv;
216 	    RealTime::to_timeval(time_diff, &tv);
217 	    int select_result = select(fdin + 1, &fdset, 0, 0, &tv);
218 	    if (select_result > 0) break;
219 
220 	    if (select_result == 0)
221 		throw_timeout("Timeout expired while trying to read", context);
222 
223 	    // EINTR means select was interrupted by a signal.  The Linux
224 	    // select(2) man page says: "Portable programs may wish to check
225 	    // for EAGAIN and loop, just as with EINTR" and that seems to be
226 	    // necessary for cygwin at least.
227 	    if (errno != EINTR && errno != EAGAIN)
228 		throw Xapian::NetworkError("select failed during read",
229 					   context, errno);
230 # endif
231 	}
232     }
233 #endif
234     RETURN(true);
235 }
236 
237 bool
ready_to_read() const238 RemoteConnection::ready_to_read() const
239 {
240     LOGCALL(REMOTE, bool, "RemoteConnection::ready_to_read", NO_ARGS);
241     if (fdin == -1)
242 	throw_database_closed();
243 
244     if (!buffer.empty()) RETURN(true);
245 
246     // See if there's data available to be read.
247 #ifdef HAVE_POLL
248     struct pollfd fds;
249     fds.fd = fdin;
250     fds.events = POLLIN;
251     RETURN(poll(&fds, 1, 0) > 0);
252 #else
253 # ifndef __WIN32__
254     if (fdin >= FD_SETSIZE) {
255 	// Not ideal, but OK for how this method is currently used.
256 	RETURN(true);
257     }
258 # endif
259     fd_set fdset;
260     FD_ZERO(&fdset);
261     FD_SET(fdin, &fdset);
262 
263     struct timeval tv;
264     tv.tv_sec = 0;
265     tv.tv_usec = 0;
266     RETURN(select(fdin + 1, &fdset, 0, 0, &tv) > 0);
267 #endif
268 }
269 
270 void
send_message(char type,const string & message,double end_time)271 RemoteConnection::send_message(char type, const string &message,
272 			       double end_time)
273 {
274     LOGCALL_VOID(REMOTE, "RemoteConnection::send_message", type | message | end_time);
275     if (fdout == -1)
276 	throw_database_closed();
277 
278     string header;
279     header += type;
280     header += encode_length(message.size());
281 
282 #ifdef __WIN32__
283     HANDLE hout = fd_to_handle(fdout);
284     const string * str = &header;
285 
286     size_t count = 0;
287     while (true) {
288 	DWORD n;
289 	BOOL ok = WriteFile(hout, str->data() + count, str->size() - count, &n, &overlapped);
290 	if (!ok) {
291 	    int errcode = GetLastError();
292 	    if (errcode != ERROR_IO_PENDING)
293 		throw Xapian::NetworkError("write failed", context, -errcode);
294 	    // Just wait for the data to be sent, or a timeout.
295 	    DWORD waitrc;
296 	    waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
297 	    if (waitrc != WAIT_OBJECT_0) {
298 		LOGLINE(REMOTE, "write: timeout has expired");
299 		throw_timeout("Timeout expired while trying to write", context);
300 	    }
301 	    // Get the final result.
302 	    if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
303 		throw Xapian::NetworkError("Failed to get overlapped result",
304 					   context, -int(GetLastError()));
305 	}
306 
307 	count += n;
308 
309 	// We must update the offset in the OVERLAPPED structure manually.
310 	update_overlapped_offset(overlapped, n);
311 
312 	if (count == str->size()) {
313 	    if (str == &message || message.empty()) return;
314 	    str = &message;
315 	    count = 0;
316 	}
317     }
318 #else
319     // If there's no end_time, just use blocking I/O.
320     if (fcntl(fdout, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
321 	throw Xapian::NetworkError("Failed to set fdout non-blocking-ness",
322 				   context, errno);
323     }
324 
325     const string * str = &header;
326 
327     size_t count = 0;
328     while (true) {
329 	// We've set write to non-blocking, so just try writing as there
330 	// will usually be space.
331 	ssize_t n = write(fdout, str->data() + count, str->size() - count);
332 
333 	if (n >= 0) {
334 	    count += n;
335 	    if (count == str->size()) {
336 		if (str == &message || message.empty()) return;
337 		str = &message;
338 		count = 0;
339 	    }
340 	    continue;
341 	}
342 
343 	LOGLINE(REMOTE, "write gave errno = " << errno);
344 	if (errno == EINTR) continue;
345 
346 	if (errno != EAGAIN)
347 	    throw Xapian::NetworkError("write failed", context, errno);
348 
349 	double now = RealTime::now();
350 	double time_diff = end_time - now;
351 	if (time_diff < 0) {
352 	    LOGLINE(REMOTE, "write: timeout has expired");
353 	    throw_timeout("Timeout expired while trying to write", context);
354 	}
355 
356 	// Wait until there is space or the timeout is reached.
357 # ifdef HAVE_POLL
358 	struct pollfd fds;
359 	fds.fd = fdout;
360 	fds.events = POLLOUT;
361 	int result = poll(&fds, 1, int(time_diff * 1000));
362 #  define POLLSELECT "poll"
363 # else
364 	if (fdout >= FD_SETSIZE) {
365 	    // We can't block with a timeout, so just sleep and retry.
366 	    RealTime::sleep(now + min(0.001, time_diff / 4));
367 	    continue;
368 	}
369 
370 	fd_set fdset;
371 	FD_ZERO(&fdset);
372 	FD_SET(fdout, &fdset);
373 
374 	struct timeval tv;
375 	RealTime::to_timeval(time_diff, &tv);
376 	int result = select(fdout + 1, 0, &fdset, 0, &tv);
377 #  define POLLSELECT "select"
378 # endif
379 
380 	if (result < 0) {
381 	    if (errno == EINTR || errno == EAGAIN) {
382 		// EINTR/EAGAIN means select was interrupted by a signal.
383 		// We could just retry the poll/select, but it's easier to just
384 		// retry the write.
385 		continue;
386 	    }
387 	    throw Xapian::NetworkError(POLLSELECT " failed during write",
388 				       context, errno);
389 # undef POLLSELECT
390 	}
391 
392 	if (result == 0)
393 	    throw_timeout("Timeout expired while trying to write", context);
394     }
395 #endif
396 }
397 
398 void
send_file(char type,int fd,double end_time)399 RemoteConnection::send_file(char type, int fd, double end_time)
400 {
401     LOGCALL_VOID(REMOTE, "RemoteConnection::send_file", type | fd | end_time);
402     if (fdout == -1)
403 	throw_database_closed();
404 
405     off_t size = file_size(fd);
406     if (errno)
407 	throw Xapian::NetworkError("Couldn't stat file to send", errno);
408     // FIXME: Use sendfile() or similar if available?
409 
410     char buf[CHUNKSIZE];
411     buf[0] = type;
412     size_t c = 1;
413     {
414 	string enc_size = encode_length(size);
415 	c += enc_size.size();
416 	// An encoded length should be just a few bytes.
417 	AssertRel(c, <=, sizeof(buf));
418 	memcpy(buf + 1, enc_size.data(), enc_size.size());
419     }
420 
421 #ifdef __WIN32__
422     HANDLE hout = fd_to_handle(fdout);
423     size_t count = 0;
424     while (true) {
425 	DWORD n;
426 	BOOL ok = WriteFile(hout, buf + count, c - count, &n, &overlapped);
427 	if (!ok) {
428 	    int errcode = GetLastError();
429 	    if (errcode != ERROR_IO_PENDING)
430 		throw Xapian::NetworkError("write failed", context, -errcode);
431 	    // Just wait for the data to be sent, or a timeout.
432 	    DWORD waitrc;
433 	    waitrc = WaitForSingleObject(overlapped.hEvent, calc_read_wait_msecs(end_time));
434 	    if (waitrc != WAIT_OBJECT_0) {
435 		LOGLINE(REMOTE, "write: timeout has expired");
436 		throw_timeout("Timeout expired while trying to write", context);
437 	    }
438 	    // Get the final result.
439 	    if (!GetOverlappedResult(hout, &overlapped, &n, FALSE))
440 		throw Xapian::NetworkError("Failed to get overlapped result",
441 					   context, -int(GetLastError()));
442 	}
443 
444 	count += n;
445 
446 	// We must update the offset in the OVERLAPPED structure manually.
447 	update_overlapped_offset(overlapped, n);
448 
449 	if (count == c) {
450 	    if (size == 0) return;
451 
452 	    ssize_t res;
453 	    do {
454 		res = read(fd, buf, sizeof(buf));
455 	    } while (res < 0 && errno == EINTR);
456 	    if (res < 0) throw Xapian::NetworkError("read failed", errno);
457 	    c = size_t(res);
458 
459 	    size -= c;
460 	    count = 0;
461 	}
462     }
463 #else
464     // If there's no end_time, just use blocking I/O.
465     if (fcntl(fdout, F_SETFL, (end_time != 0.0) ? O_NONBLOCK : 0) < 0) {
466 	throw Xapian::NetworkError("Failed to set fdout non-blocking-ness",
467 				   context, errno);
468     }
469 
470     size_t count = 0;
471     while (true) {
472 	// We've set write to non-blocking, so just try writing as there
473 	// will usually be space.
474 	ssize_t n = write(fdout, buf + count, c - count);
475 
476 	if (n >= 0) {
477 	    count += n;
478 	    if (count == c) {
479 		if (size == 0) return;
480 
481 		ssize_t res;
482 		do {
483 		    res = read(fd, buf, sizeof(buf));
484 		} while (res < 0 && errno == EINTR);
485 		if (res < 0) throw Xapian::NetworkError("read failed", errno);
486 		c = size_t(res);
487 
488 		size -= c;
489 		count = 0;
490 	    }
491 	    continue;
492 	}
493 
494 	LOGLINE(REMOTE, "write gave errno = " << errno);
495 	if (errno == EINTR) continue;
496 
497 	if (errno != EAGAIN)
498 	    throw Xapian::NetworkError("write failed", context, errno);
499 
500 	double now = RealTime::now();
501 	double time_diff = end_time - now;
502 	if (time_diff < 0) {
503 	    LOGLINE(REMOTE, "write: timeout has expired");
504 	    throw_timeout("Timeout expired while trying to write", context);
505 	}
506 
507 	// Wait until there is space or the timeout is reached.
508 # ifdef HAVE_POLL
509 	struct pollfd fds;
510 	fds.fd = fdout;
511 	fds.events = POLLOUT;
512 	int result = poll(&fds, 1, int(time_diff * 1000));
513 #  define POLLSELECT "poll"
514 # else
515 	if (fdout >= FD_SETSIZE) {
516 	    // We can't block with a timeout, so just sleep and retry.
517 	    RealTime::sleep(now + min(0.001, time_diff / 4));
518 	    continue;
519 	}
520 
521 	fd_set fdset;
522 	FD_ZERO(&fdset);
523 	FD_SET(fdout, &fdset);
524 
525 	struct timeval tv;
526 	RealTime::to_timeval(time_diff, &tv);
527 	int result = select(fdout + 1, 0, &fdset, 0, &tv);
528 #  define POLLSELECT "select"
529 # endif
530 
531 	if (result < 0) {
532 	    if (errno == EINTR || errno == EAGAIN) {
533 		// EINTR/EAGAIN means select was interrupted by a signal.
534 		// We could just retry the poll/select, but it's easier to just
535 		// retry the write.
536 		continue;
537 	    }
538 	    throw Xapian::NetworkError(POLLSELECT " failed during write",
539 				       context, errno);
540 # undef POLLSELECT
541 	}
542 
543 	if (result == 0)
544 	    throw_timeout("Timeout expired while trying to write", context);
545     }
546 #endif
547 }
548 
549 int
sniff_next_message_type(double end_time)550 RemoteConnection::sniff_next_message_type(double end_time)
551 {
552     LOGCALL(REMOTE, int, "RemoteConnection::sniff_next_message_type", end_time);
553     if (fdin == -1)
554 	throw_database_closed();
555 
556     if (!read_at_least(1, end_time))
557 	RETURN(-1);
558     unsigned char type = buffer[0];
559     RETURN(type);
560 }
561 
562 int
get_message(string & result,double end_time)563 RemoteConnection::get_message(string &result, double end_time)
564 {
565     LOGCALL(REMOTE, int, "RemoteConnection::get_message", result | end_time);
566     if (fdin == -1)
567 	throw_database_closed();
568 
569     if (!read_at_least(2, end_time))
570 	RETURN(-1);
571     size_t len = static_cast<unsigned char>(buffer[1]);
572     if (!read_at_least(len + 2, end_time))
573 	RETURN(-1);
574     if (len != 0xff) {
575 	result.assign(buffer.data() + 2, len);
576 	unsigned char type = buffer[0];
577 	buffer.erase(0, len + 2);
578 	RETURN(type);
579     }
580     len = 0;
581     string::const_iterator i = buffer.begin() + 2;
582     unsigned char ch;
583     int shift = 0;
584     do {
585 	if (i == buffer.end() || shift > 28) {
586 	    // Something is very wrong...
587 	    throw_network_error_insane_message_length();
588 	}
589 	ch = *i++;
590 	len |= size_t(ch & 0x7f) << shift;
591 	shift += 7;
592     } while ((ch & 0x80) == 0);
593     len += 255;
594     size_t header_len = (i - buffer.begin());
595     if (!read_at_least(header_len + len, end_time))
596 	RETURN(-1);
597     result.assign(buffer.data() + header_len, len);
598     unsigned char type = buffer[0];
599     buffer.erase(0, header_len + len);
600     RETURN(type);
601 }
602 
603 int
get_message_chunked(double end_time)604 RemoteConnection::get_message_chunked(double end_time)
605 {
606     LOGCALL(REMOTE, int, "RemoteConnection::get_message_chunked", end_time);
607 
608     if (fdin == -1)
609 	throw_database_closed();
610 
611     if (!read_at_least(2, end_time))
612 	RETURN(-1);
613     uint_least64_t len = static_cast<unsigned char>(buffer[1]);
614     if (len != 0xff) {
615 	chunked_data_left = off_t(len);
616 	char type = buffer[0];
617 	buffer.erase(0, 2);
618 	RETURN(type);
619     }
620     if (!read_at_least(len + 2, end_time))
621 	RETURN(-1);
622     len = 0;
623     string::const_iterator i = buffer.begin() + 2;
624     unsigned char ch;
625     int shift = 0;
626     do {
627 	// Allow at most 63 bits for message lengths - it's neatly a multiple
628 	// of 7 bits and anything longer than this is almost certainly a
629 	// corrupt value.
630 	// The value also needs to be representable as an
631 	// off_t (which is a signed type), so use that size if it is smaller.
632 	const int SHIFT_LIMIT = 63;
633 	if (rare(i == buffer.end() || shift >= SHIFT_LIMIT)) {
634 	    // Something is very wrong...
635 	    throw_network_error_insane_message_length();
636 	}
637 	ch = *i++;
638 	uint_least64_t bits = ch & 0x7f;
639 	len |= bits << shift;
640 	shift += 7;
641     } while ((ch & 0x80) == 0);
642     len += 255;
643 
644     chunked_data_left = off_t(len);
645     if (sizeof(off_t) * CHAR_BIT < 63) {
646 	// Check that the value of len fits in an off_t without loss.
647 	if (rare(uint_least64_t(chunked_data_left) != len)) {
648 	    throw_network_error_insane_message_length();
649 	}
650     }
651 
652     unsigned char type = buffer[0];
653     size_t header_len = (i - buffer.begin());
654     buffer.erase(0, header_len);
655     RETURN(type);
656 }
657 
658 int
get_message_chunk(string & result,size_t at_least,double end_time)659 RemoteConnection::get_message_chunk(string &result, size_t at_least,
660 				    double end_time)
661 {
662     LOGCALL(REMOTE, int, "RemoteConnection::get_message_chunk", result | at_least | end_time);
663     if (fdin == -1)
664 	throw_database_closed();
665 
666     if (at_least <= result.size()) RETURN(true);
667     at_least -= result.size();
668 
669     bool read_enough = (off_t(at_least) <= chunked_data_left);
670     if (!read_enough) at_least = size_t(chunked_data_left);
671 
672     if (!read_at_least(at_least, end_time))
673 	RETURN(-1);
674 
675     size_t retlen = min(off_t(buffer.size()), chunked_data_left);
676     result.append(buffer, 0, retlen);
677     buffer.erase(0, retlen);
678     chunked_data_left -= retlen;
679 
680     RETURN(int(read_enough));
681 }
682 
683 /** Write n bytes from block pointed to by p to file descriptor fd. */
684 static void
write_all(int fd,const char * p,size_t n)685 write_all(int fd, const char * p, size_t n)
686 {
687     while (n) {
688 	ssize_t c = write(fd, p, n);
689 	if (c < 0) {
690 	    if (errno == EINTR) continue;
691 	    throw Xapian::NetworkError("Error writing to file", errno);
692 	}
693 	p += c;
694 	n -= c;
695     }
696 }
697 
698 int
receive_file(const string & file,double end_time)699 RemoteConnection::receive_file(const string &file, double end_time)
700 {
701     LOGCALL(REMOTE, int, "RemoteConnection::receive_file", file | end_time);
702     if (fdin == -1)
703 	throw_database_closed();
704 
705     // FIXME: Do we want to be able to delete the file during writing?
706     FD fd(posixy_open(file.c_str(), O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, 0666));
707     if (fd == -1)
708 	throw Xapian::NetworkError("Couldn't open file for writing: " + file, errno);
709 
710     int type = get_message_chunked(end_time);
711     do {
712 	off_t min_read = min(chunked_data_left, off_t(CHUNKSIZE));
713 	if (!read_at_least(min_read, end_time))
714 	    RETURN(-1);
715 	write_all(fd, buffer.data(), min_read);
716 	chunked_data_left -= min_read;
717 	buffer.erase(0, min_read);
718     } while (chunked_data_left);
719     RETURN(type);
720 }
721 
722 void
shutdown()723 RemoteConnection::shutdown()
724 {
725     LOGCALL_VOID(REMOTE, "RemoteConnection::shutdown", NO_ARGS);
726 
727     if (fdin < 0) return;
728 
729     // We can be called from a destructor, so we can't throw an exception.
730     try {
731 	send_message(MSG_SHUTDOWN, string(), 0.0);
732 #ifdef __WIN32__
733 	HANDLE hin = fd_to_handle(fdin);
734 	char dummy;
735 	DWORD received;
736 	BOOL ok = ReadFile(hin, &dummy, 1, &received, &overlapped);
737 	if (!ok && GetLastError() == ERROR_IO_PENDING) {
738 	    // Wait for asynchronous read to complete.
739 	    (void)WaitForSingleObject(overlapped.hEvent, INFINITE);
740 	}
741 #else
742 	// Wait for the connection to be closed - when this happens
743 	// poll()/select() will report that a read won't block.
744 # ifdef HAVE_POLL
745 	struct pollfd fds;
746 	fds.fd = fdin;
747 	fds.events = POLLIN;
748 	int res;
749 	do {
750 	    res = poll(&fds, 1, -1);
751 	} while (res < 0 && (errno == EINTR || errno == EAGAIN));
752 # else
753 	if (fdin < FD_SETSIZE) {
754 	    fd_set fdset;
755 	    FD_ZERO(&fdset);
756 	    FD_SET(fdin, &fdset);
757 	    int res;
758 	    do {
759 		res = select(fdin + 1, &fdset, 0, 0, NULL);
760 	    } while (res < 0 && (errno == EINTR || errno == EAGAIN));
761 	}
762 # endif
763 #endif
764     } catch (...) {
765     }
766 }
767 
768 void
do_close()769 RemoteConnection::do_close()
770 {
771     LOGCALL_VOID(REMOTE, "RemoteConnection::do_close", NO_ARGS);
772 
773     if (fdin >= 0) {
774 	close_fd_or_socket(fdin);
775 
776 	// If the same fd is used in both directions, don't close it twice.
777 	if (fdin == fdout) fdout = -1;
778 
779 	fdin = -1;
780     }
781 
782     if (fdout >= 0) {
783 	close_fd_or_socket(fdout);
784 	fdout = -1;
785     }
786 }
787 
788 #ifdef __WIN32__
789 DWORD
calc_read_wait_msecs(double end_time)790 RemoteConnection::calc_read_wait_msecs(double end_time)
791 {
792     if (end_time == 0.0)
793 	return INFINITE;
794 
795     // Calculate how far in the future end_time is.
796     double time_diff = end_time - RealTime::now();
797 
798     // DWORD is unsigned, so we mustn't try and return a negative value.
799     if (time_diff < 0.0) {
800 	throw_timeout("Timeout expired before starting read", context);
801     }
802     return static_cast<DWORD>(time_diff * 1000.0);
803 }
804 #endif
805