1 /** @file flint_databasereplicator.cc
2  * @brief Support for flint database replication
3  */
4 /* Copyright 2008 Lemur Consulting Ltd
5  * Copyright 2009,2010,2015,2016 Olly Betts
6  * Copyright 2010 Richard Boulton
7  *
8  * This program is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU General Public License as
10  * published by the Free Software Foundation; either version 2 of the
11  * License, or (at your option) any later version.
12  *
13  * This program is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License
19  * along with this program; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301
21  * USA
22  */
23 
24 #include <config.h>
25 
26 #include "flint_databasereplicator.h"
27 
28 #include "xapian/error.h"
29 
30 #include "../flint_lock.h"
31 #include "flint_record.h"
32 #include "flint_replicate_internal.h"
33 #include "flint_types.h"
34 #include "flint_utils.h"
35 #include "flint_version.h"
36 #include "debuglog.h"
37 #include "io_utils.h"
38 #include "noreturn.h"
39 #include "remoteconnection.h"
40 #include "replicate_utils.h"
41 #include "replicationprotocol.h"
42 #include "safeerrno.h"
43 #include "str.h"
44 #include "stringutils.h"
45 #include "utils.h"
46 
47 #ifdef __WIN32__
48 # include "msvc_posix_wrapper.h"
49 #endif
50 
51 #include <cstdio> // For rename().
52 
53 XAPIAN_NORETURN(static void throw_connection_closed_unexpectedly());
54 static void
throw_connection_closed_unexpectedly()55 throw_connection_closed_unexpectedly()
56 {
57     throw Xapian::NetworkError("Connection closed unexpectedly");
58 }
59 
60 using namespace std;
61 using namespace Xapian;
62 
FlintDatabaseReplicator(const string & db_dir_)63 FlintDatabaseReplicator::FlintDatabaseReplicator(const string & db_dir_)
64 	: db_dir(db_dir_),
65 	  max_changesets(0)
66 {
67     const char *p = getenv("XAPIAN_MAX_CHANGESETS");
68     if (p)
69 	max_changesets = atoi(p);
70 }
71 
72 bool
check_revision_at_least(const string & rev,const string & target) const73 FlintDatabaseReplicator::check_revision_at_least(const string & rev,
74 						 const string & target) const
75 {
76     LOGCALL(DB, bool, "FlintDatabaseReplicator::check_revision_at_least", rev | target);
77 
78     flint_revision_number_t rev_val;
79     flint_revision_number_t target_val;
80 
81     const char * ptr = rev.data();
82     const char * end = ptr + rev.size();
83     if (!F_unpack_uint(&ptr, end, &rev_val)) {
84 	throw NetworkError("Invalid revision string supplied to check_revision_at_least");
85     }
86 
87     ptr = target.data();
88     end = ptr + target.size();
89     if (!F_unpack_uint(&ptr, end, &target_val)) {
90 	throw NetworkError("Invalid revision string supplied to check_revision_at_least");
91     }
92 
93     RETURN(rev_val >= target_val);
94 }
95 
96 void
process_changeset_chunk_base(const string & tablename,string & buf,RemoteConnection & conn,double end_time,int changes_fd) const97 FlintDatabaseReplicator::process_changeset_chunk_base(const string & tablename,
98 						      string & buf,
99 						      RemoteConnection & conn,
100 						      double end_time,
101 						      int changes_fd) const
102 {
103     const char *ptr = buf.data();
104     const char *end = ptr + buf.size();
105 
106     // Get the letter
107     char letter = ptr[0];
108     if (letter != 'A' && letter != 'B')
109 	throw NetworkError("Invalid base file letter in changeset");
110     ++ptr;
111 
112 
113     // Get the base size
114     if (ptr == end)
115 	throw NetworkError("Unexpected end of changeset (5)");
116     string::size_type base_size;
117     if (!F_unpack_uint(&ptr, end, &base_size))
118 	throw NetworkError("Invalid base file size in changeset");
119 
120     // Get the new base file into buf.
121     write_and_clear_changes(changes_fd, buf, ptr - buf.data());
122     int res = conn.get_message_chunk(buf, base_size, end_time);
123     if (res <= 0) {
124 	if (res < 0)
125 	    throw_connection_closed_unexpectedly();
126 	throw NetworkError("Unexpected end of changeset (6)");
127     }
128 
129     // Write base_size bytes from start of buf to base file for tablename
130     string tmp_path = db_dir + "/" + tablename + "tmp";
131     string base_path = db_dir + "/" + tablename + ".base" + letter;
132 #ifdef __WIN32__
133     int fd = msvc_posix_open(tmp_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY);
134 #else
135     int fd = ::open(tmp_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0666);
136 #endif
137     if (fd == -1) {
138 	string msg = "Failed to open ";
139 	msg += tmp_path;
140 	throw DatabaseError(msg, errno);
141     }
142     {
143 	fdcloser closer(fd);
144 
145 	io_write(fd, buf.data(), base_size);
146 	io_sync(fd);
147     }
148 
149     // Finish writing the changeset before moving the base file into place.
150     write_and_clear_changes(changes_fd, buf, base_size);
151 
152     if (!io_tmp_rename(tmp_path, base_path)) {
153 	string msg("Couldn't update base file ");
154 	msg += tablename;
155 	msg += ".base";
156 	msg += letter;
157 	throw DatabaseError(msg, errno);
158     }
159 }
160 
161 void
process_changeset_chunk_blocks(const string & tablename,string & buf,RemoteConnection & conn,double end_time,int changes_fd) const162 FlintDatabaseReplicator::process_changeset_chunk_blocks(const string & tablename,
163 							string & buf,
164 							RemoteConnection & conn,
165 							double end_time,
166 							int changes_fd) const
167 {
168     const char *ptr = buf.data();
169     const char *end = ptr + buf.size();
170 
171     unsigned int changeset_blocksize;
172     if (!F_unpack_uint(&ptr, end, &changeset_blocksize))
173 	throw NetworkError("Invalid blocksize in changeset");
174     write_and_clear_changes(changes_fd, buf, ptr - buf.data());
175 
176     string db_path = db_dir + "/" + tablename + ".DB";
177 #ifdef __WIN32__
178     int fd = msvc_posix_open(db_path.c_str(), O_WRONLY | O_BINARY);
179 #else
180     int fd = ::open(db_path.c_str(), O_WRONLY | O_BINARY, 0666);
181 #endif
182     if (fd == -1) {
183 	if (file_exists(db_path)) {
184 	    string msg = "Failed to open ";
185 	    msg += db_path;
186 	    throw DatabaseError(msg, errno);
187 	}
188 #ifdef __WIN32__
189 	fd = msvc_posix_open(db_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY);
190 #else
191 	fd = ::open(db_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0666);
192 #endif
193 	if (fd == -1) {
194 	    string msg = "Failed to create and open ";
195 	    msg += db_path;
196 	    throw DatabaseError(msg, errno);
197 	}
198     }
199     {
200 	fdcloser closer(fd);
201 
202 	while (true) {
203 	    if (conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time) < 0)
204 		throw_connection_closed_unexpectedly();
205 
206 	    ptr = buf.data();
207 	    end = ptr + buf.size();
208 
209 	    uint4 block_number;
210 	    if (!F_unpack_uint(&ptr, end, &block_number))
211 		throw NetworkError("Invalid block number in changeset");
212 	    write_and_clear_changes(changes_fd, buf, ptr - buf.data());
213 	    if (block_number == 0)
214 		break;
215 	    --block_number;
216 
217 	    int res = conn.get_message_chunk(buf, changeset_blocksize, end_time);
218 	    if (res <= 0) {
219 		if (res < 0)
220 		    throw_connection_closed_unexpectedly();
221 		throw NetworkError("Incomplete block in changeset");
222 	    }
223 
224 	    // Write the block.
225 	    // FIXME - should use pwrite if that's available.
226 	    if (lseek(fd, off_t(changeset_blocksize) * block_number, SEEK_SET) == -1) {
227 		string msg = "Failed to seek to block ";
228 		msg += str(block_number);
229 		throw DatabaseError(msg, errno);
230 	    }
231 	    io_write(fd, buf.data(), changeset_blocksize);
232 
233 	    write_and_clear_changes(changes_fd, buf, changeset_blocksize);
234 	}
235 	io_sync(fd);
236     }
237 }
238 
239 string
apply_changeset_from_conn(RemoteConnection & conn,double end_time,bool valid) const240 FlintDatabaseReplicator::apply_changeset_from_conn(RemoteConnection & conn,
241 						   double end_time,
242 						   bool valid) const
243 {
244     LOGCALL(DB, string, "FlintDatabaseReplicator::apply_changeset_from_conn", conn | end_time | valid);
245 
246     // Lock the database to perform modifications.
247     FlintLock lock(db_dir);
248     string explanation;
249     FlintLock::reason why = lock.lock(true, explanation);
250     if (why != FlintLock::SUCCESS) {
251 	lock.throw_databaselockerror(why, db_dir, explanation);
252     }
253 
254     int type = conn.get_message_chunked(end_time);
255     if (type == EOF)
256 	throw_connection_closed_unexpectedly();
257     AssertEq(type, REPL_REPLY_CHANGESET);
258 
259     string buf;
260     // Read enough to be certain that we've got the header part of the
261     // changeset.
262 
263     if (conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time) < 0)
264 	throw_connection_closed_unexpectedly();
265     // Check the magic string.
266     if (!startswith(buf, CHANGES_MAGIC_STRING)) {
267 	throw NetworkError("Invalid ChangeSet magic string");
268     }
269     const char *ptr = buf.data();
270     const char *end = ptr + buf.size();
271     ptr += CONST_STRLEN(CHANGES_MAGIC_STRING);
272 
273     unsigned int changes_version;
274     if (!F_unpack_uint(&ptr, end, &changes_version))
275 	throw NetworkError("Couldn't read a valid version number from changeset");
276     if (changes_version != CHANGES_VERSION)
277 	throw NetworkError("Unsupported changeset version");
278 
279     flint_revision_number_t startrev;
280     flint_revision_number_t endrev;
281 
282     if (!F_unpack_uint(&ptr, end, &startrev))
283 	throw NetworkError("Couldn't read a valid start revision from changeset");
284     if (!F_unpack_uint(&ptr, end, &endrev))
285 	throw NetworkError("Couldn't read a valid end revision from changeset");
286 
287     if (endrev <= startrev)
288 	throw NetworkError("End revision in changeset is not later than start revision");
289 
290     if (ptr == end)
291 	throw NetworkError("Unexpected end of changeset (1)");
292 
293     int changes_fd = -1;
294     string changes_name;
295     if (max_changesets > 0) {
296 	changes_fd = create_changeset_file(db_dir, "changes" + str(startrev),
297 					   changes_name);
298     }
299     fdcloser closer(changes_fd);
300 
301     if (valid) {
302 	// Check the revision number.
303 	// If the database was not known to be valid, we cannot
304 	// reliably determine its revision number, so must skip this
305 	// check.
306 	FlintRecordTable record_table(db_dir, true);
307 	record_table.open();
308 	if (startrev != record_table.get_open_revision_number())
309 	    throw NetworkError("Changeset supplied is for wrong revision number");
310     }
311 
312     unsigned char changes_type = ptr[0];
313     if (changes_type != 0) {
314 	throw NetworkError("Unsupported changeset type: " + str(changes_type));
315 	// FIXME - support changes of type 1, produced when DANGEROUS mode is
316 	// on.
317     }
318 
319     // Write and clear the bits of the buffer which have been read.
320     write_and_clear_changes(changes_fd, buf, ptr + 1 - buf.data());
321 
322     // Read the items from the changeset.
323     while (true) {
324 	if (conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time) < 0)
325 	    throw_connection_closed_unexpectedly();
326 	ptr = buf.data();
327 	end = ptr + buf.size();
328 
329 	// Read the type of the next chunk of data
330 	if (ptr == end)
331 	    throw NetworkError("Unexpected end of changeset (2)");
332 	unsigned char chunk_type = ptr[0];
333 	++ptr;
334 	if (chunk_type == 0)
335 	    break;
336 
337 	// Get the tablename.
338 	string tablename;
339 	if (!F_unpack_string(&ptr, end, tablename))
340 	    throw NetworkError("Unexpected end of changeset (3)");
341 	if (tablename.empty())
342 	    throw NetworkError("Missing tablename in changeset");
343 	if (tablename.find_first_not_of("abcdefghijklmnopqrstuvwxyz") !=
344 	    tablename.npos)
345 	    throw NetworkError("Invalid character in tablename in changeset");
346 
347 	// Process the chunk
348 	if (ptr == end)
349 	    throw NetworkError("Unexpected end of changeset (4)");
350 	write_and_clear_changes(changes_fd, buf, ptr - buf.data());
351 
352 	switch (chunk_type) {
353 	    case 1:
354 		process_changeset_chunk_base(tablename, buf, conn, end_time,
355 					     changes_fd);
356 		break;
357 	    case 2:
358 		process_changeset_chunk_blocks(tablename, buf, conn, end_time,
359 					       changes_fd);
360 		break;
361 	    default:
362 		throw NetworkError("Unrecognised item type in changeset");
363 	}
364     }
365     flint_revision_number_t reqrev;
366     if (!F_unpack_uint(&ptr, end, &reqrev))
367 	throw NetworkError("Couldn't read a valid required revision from changeset");
368     if (reqrev < endrev)
369 	throw NetworkError("Required revision in changeset is earlier than end revision");
370     if (ptr != end)
371 	throw NetworkError("Junk found at end of changeset");
372 
373     write_and_clear_changes(changes_fd, buf, buf.size());
374     buf = F_pack_uint(reqrev);
375     RETURN(buf);
376 }
377 
378 string
get_uuid() const379 FlintDatabaseReplicator::get_uuid() const
380 {
381     LOGCALL(DB, string, "FlintDatabaseReplicator::get_uuid", NO_ARGS);
382     FlintVersion version_file(db_dir);
383     try {
384 	version_file.read_and_check(true);
385     } catch (const DatabaseError &) {
386 	RETURN(string());
387     }
388     RETURN(version_file.get_uuid_string());
389 }
390