1 /** @file flint_databasereplicator.cc
2 * @brief Support for flint database replication
3 */
4 /* Copyright 2008 Lemur Consulting Ltd
5 * Copyright 2009,2010,2015,2016 Olly Betts
6 * Copyright 2010 Richard Boulton
7 *
8 * This program is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU General Public License as
10 * published by the Free Software Foundation; either version 2 of the
11 * License, or (at your option) any later version.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
21 * USA
22 */
23
24 #include <config.h>
25
26 #include "flint_databasereplicator.h"
27
28 #include "xapian/error.h"
29
30 #include "../flint_lock.h"
31 #include "flint_record.h"
32 #include "flint_replicate_internal.h"
33 #include "flint_types.h"
34 #include "flint_utils.h"
35 #include "flint_version.h"
36 #include "debuglog.h"
37 #include "io_utils.h"
38 #include "noreturn.h"
39 #include "remoteconnection.h"
40 #include "replicate_utils.h"
41 #include "replicationprotocol.h"
42 #include "safeerrno.h"
43 #include "str.h"
44 #include "stringutils.h"
45 #include "utils.h"
46
47 #ifdef __WIN32__
48 # include "msvc_posix_wrapper.h"
49 #endif
50
51 #include <cstdio> // For rename().
52
53 XAPIAN_NORETURN(static void throw_connection_closed_unexpectedly());
54 static void
throw_connection_closed_unexpectedly()55 throw_connection_closed_unexpectedly()
56 {
57 throw Xapian::NetworkError("Connection closed unexpectedly");
58 }
59
60 using namespace std;
61 using namespace Xapian;
62
FlintDatabaseReplicator(const string & db_dir_)63 FlintDatabaseReplicator::FlintDatabaseReplicator(const string & db_dir_)
64 : db_dir(db_dir_),
65 max_changesets(0)
66 {
67 const char *p = getenv("XAPIAN_MAX_CHANGESETS");
68 if (p)
69 max_changesets = atoi(p);
70 }
71
72 bool
check_revision_at_least(const string & rev,const string & target) const73 FlintDatabaseReplicator::check_revision_at_least(const string & rev,
74 const string & target) const
75 {
76 LOGCALL(DB, bool, "FlintDatabaseReplicator::check_revision_at_least", rev | target);
77
78 flint_revision_number_t rev_val;
79 flint_revision_number_t target_val;
80
81 const char * ptr = rev.data();
82 const char * end = ptr + rev.size();
83 if (!F_unpack_uint(&ptr, end, &rev_val)) {
84 throw NetworkError("Invalid revision string supplied to check_revision_at_least");
85 }
86
87 ptr = target.data();
88 end = ptr + target.size();
89 if (!F_unpack_uint(&ptr, end, &target_val)) {
90 throw NetworkError("Invalid revision string supplied to check_revision_at_least");
91 }
92
93 RETURN(rev_val >= target_val);
94 }
95
96 void
process_changeset_chunk_base(const string & tablename,string & buf,RemoteConnection & conn,double end_time,int changes_fd) const97 FlintDatabaseReplicator::process_changeset_chunk_base(const string & tablename,
98 string & buf,
99 RemoteConnection & conn,
100 double end_time,
101 int changes_fd) const
102 {
103 const char *ptr = buf.data();
104 const char *end = ptr + buf.size();
105
106 // Get the letter
107 char letter = ptr[0];
108 if (letter != 'A' && letter != 'B')
109 throw NetworkError("Invalid base file letter in changeset");
110 ++ptr;
111
112
113 // Get the base size
114 if (ptr == end)
115 throw NetworkError("Unexpected end of changeset (5)");
116 string::size_type base_size;
117 if (!F_unpack_uint(&ptr, end, &base_size))
118 throw NetworkError("Invalid base file size in changeset");
119
120 // Get the new base file into buf.
121 write_and_clear_changes(changes_fd, buf, ptr - buf.data());
122 int res = conn.get_message_chunk(buf, base_size, end_time);
123 if (res <= 0) {
124 if (res < 0)
125 throw_connection_closed_unexpectedly();
126 throw NetworkError("Unexpected end of changeset (6)");
127 }
128
129 // Write base_size bytes from start of buf to base file for tablename
130 string tmp_path = db_dir + "/" + tablename + "tmp";
131 string base_path = db_dir + "/" + tablename + ".base" + letter;
132 #ifdef __WIN32__
133 int fd = msvc_posix_open(tmp_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY);
134 #else
135 int fd = ::open(tmp_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0666);
136 #endif
137 if (fd == -1) {
138 string msg = "Failed to open ";
139 msg += tmp_path;
140 throw DatabaseError(msg, errno);
141 }
142 {
143 fdcloser closer(fd);
144
145 io_write(fd, buf.data(), base_size);
146 io_sync(fd);
147 }
148
149 // Finish writing the changeset before moving the base file into place.
150 write_and_clear_changes(changes_fd, buf, base_size);
151
152 if (!io_tmp_rename(tmp_path, base_path)) {
153 string msg("Couldn't update base file ");
154 msg += tablename;
155 msg += ".base";
156 msg += letter;
157 throw DatabaseError(msg, errno);
158 }
159 }
160
161 void
process_changeset_chunk_blocks(const string & tablename,string & buf,RemoteConnection & conn,double end_time,int changes_fd) const162 FlintDatabaseReplicator::process_changeset_chunk_blocks(const string & tablename,
163 string & buf,
164 RemoteConnection & conn,
165 double end_time,
166 int changes_fd) const
167 {
168 const char *ptr = buf.data();
169 const char *end = ptr + buf.size();
170
171 unsigned int changeset_blocksize;
172 if (!F_unpack_uint(&ptr, end, &changeset_blocksize))
173 throw NetworkError("Invalid blocksize in changeset");
174 write_and_clear_changes(changes_fd, buf, ptr - buf.data());
175
176 string db_path = db_dir + "/" + tablename + ".DB";
177 #ifdef __WIN32__
178 int fd = msvc_posix_open(db_path.c_str(), O_WRONLY | O_BINARY);
179 #else
180 int fd = ::open(db_path.c_str(), O_WRONLY | O_BINARY, 0666);
181 #endif
182 if (fd == -1) {
183 if (file_exists(db_path)) {
184 string msg = "Failed to open ";
185 msg += db_path;
186 throw DatabaseError(msg, errno);
187 }
188 #ifdef __WIN32__
189 fd = msvc_posix_open(db_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY);
190 #else
191 fd = ::open(db_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0666);
192 #endif
193 if (fd == -1) {
194 string msg = "Failed to create and open ";
195 msg += db_path;
196 throw DatabaseError(msg, errno);
197 }
198 }
199 {
200 fdcloser closer(fd);
201
202 while (true) {
203 if (conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time) < 0)
204 throw_connection_closed_unexpectedly();
205
206 ptr = buf.data();
207 end = ptr + buf.size();
208
209 uint4 block_number;
210 if (!F_unpack_uint(&ptr, end, &block_number))
211 throw NetworkError("Invalid block number in changeset");
212 write_and_clear_changes(changes_fd, buf, ptr - buf.data());
213 if (block_number == 0)
214 break;
215 --block_number;
216
217 int res = conn.get_message_chunk(buf, changeset_blocksize, end_time);
218 if (res <= 0) {
219 if (res < 0)
220 throw_connection_closed_unexpectedly();
221 throw NetworkError("Incomplete block in changeset");
222 }
223
224 // Write the block.
225 // FIXME - should use pwrite if that's available.
226 if (lseek(fd, off_t(changeset_blocksize) * block_number, SEEK_SET) == -1) {
227 string msg = "Failed to seek to block ";
228 msg += str(block_number);
229 throw DatabaseError(msg, errno);
230 }
231 io_write(fd, buf.data(), changeset_blocksize);
232
233 write_and_clear_changes(changes_fd, buf, changeset_blocksize);
234 }
235 io_sync(fd);
236 }
237 }
238
239 string
apply_changeset_from_conn(RemoteConnection & conn,double end_time,bool valid) const240 FlintDatabaseReplicator::apply_changeset_from_conn(RemoteConnection & conn,
241 double end_time,
242 bool valid) const
243 {
244 LOGCALL(DB, string, "FlintDatabaseReplicator::apply_changeset_from_conn", conn | end_time | valid);
245
246 // Lock the database to perform modifications.
247 FlintLock lock(db_dir);
248 string explanation;
249 FlintLock::reason why = lock.lock(true, explanation);
250 if (why != FlintLock::SUCCESS) {
251 lock.throw_databaselockerror(why, db_dir, explanation);
252 }
253
254 int type = conn.get_message_chunked(end_time);
255 if (type == EOF)
256 throw_connection_closed_unexpectedly();
257 AssertEq(type, REPL_REPLY_CHANGESET);
258
259 string buf;
260 // Read enough to be certain that we've got the header part of the
261 // changeset.
262
263 if (conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time) < 0)
264 throw_connection_closed_unexpectedly();
265 // Check the magic string.
266 if (!startswith(buf, CHANGES_MAGIC_STRING)) {
267 throw NetworkError("Invalid ChangeSet magic string");
268 }
269 const char *ptr = buf.data();
270 const char *end = ptr + buf.size();
271 ptr += CONST_STRLEN(CHANGES_MAGIC_STRING);
272
273 unsigned int changes_version;
274 if (!F_unpack_uint(&ptr, end, &changes_version))
275 throw NetworkError("Couldn't read a valid version number from changeset");
276 if (changes_version != CHANGES_VERSION)
277 throw NetworkError("Unsupported changeset version");
278
279 flint_revision_number_t startrev;
280 flint_revision_number_t endrev;
281
282 if (!F_unpack_uint(&ptr, end, &startrev))
283 throw NetworkError("Couldn't read a valid start revision from changeset");
284 if (!F_unpack_uint(&ptr, end, &endrev))
285 throw NetworkError("Couldn't read a valid end revision from changeset");
286
287 if (endrev <= startrev)
288 throw NetworkError("End revision in changeset is not later than start revision");
289
290 if (ptr == end)
291 throw NetworkError("Unexpected end of changeset (1)");
292
293 int changes_fd = -1;
294 string changes_name;
295 if (max_changesets > 0) {
296 changes_fd = create_changeset_file(db_dir, "changes" + str(startrev),
297 changes_name);
298 }
299 fdcloser closer(changes_fd);
300
301 if (valid) {
302 // Check the revision number.
303 // If the database was not known to be valid, we cannot
304 // reliably determine its revision number, so must skip this
305 // check.
306 FlintRecordTable record_table(db_dir, true);
307 record_table.open();
308 if (startrev != record_table.get_open_revision_number())
309 throw NetworkError("Changeset supplied is for wrong revision number");
310 }
311
312 unsigned char changes_type = ptr[0];
313 if (changes_type != 0) {
314 throw NetworkError("Unsupported changeset type: " + str(changes_type));
315 // FIXME - support changes of type 1, produced when DANGEROUS mode is
316 // on.
317 }
318
319 // Write and clear the bits of the buffer which have been read.
320 write_and_clear_changes(changes_fd, buf, ptr + 1 - buf.data());
321
322 // Read the items from the changeset.
323 while (true) {
324 if (conn.get_message_chunk(buf, REASONABLE_CHANGESET_SIZE, end_time) < 0)
325 throw_connection_closed_unexpectedly();
326 ptr = buf.data();
327 end = ptr + buf.size();
328
329 // Read the type of the next chunk of data
330 if (ptr == end)
331 throw NetworkError("Unexpected end of changeset (2)");
332 unsigned char chunk_type = ptr[0];
333 ++ptr;
334 if (chunk_type == 0)
335 break;
336
337 // Get the tablename.
338 string tablename;
339 if (!F_unpack_string(&ptr, end, tablename))
340 throw NetworkError("Unexpected end of changeset (3)");
341 if (tablename.empty())
342 throw NetworkError("Missing tablename in changeset");
343 if (tablename.find_first_not_of("abcdefghijklmnopqrstuvwxyz") !=
344 tablename.npos)
345 throw NetworkError("Invalid character in tablename in changeset");
346
347 // Process the chunk
348 if (ptr == end)
349 throw NetworkError("Unexpected end of changeset (4)");
350 write_and_clear_changes(changes_fd, buf, ptr - buf.data());
351
352 switch (chunk_type) {
353 case 1:
354 process_changeset_chunk_base(tablename, buf, conn, end_time,
355 changes_fd);
356 break;
357 case 2:
358 process_changeset_chunk_blocks(tablename, buf, conn, end_time,
359 changes_fd);
360 break;
361 default:
362 throw NetworkError("Unrecognised item type in changeset");
363 }
364 }
365 flint_revision_number_t reqrev;
366 if (!F_unpack_uint(&ptr, end, &reqrev))
367 throw NetworkError("Couldn't read a valid required revision from changeset");
368 if (reqrev < endrev)
369 throw NetworkError("Required revision in changeset is earlier than end revision");
370 if (ptr != end)
371 throw NetworkError("Junk found at end of changeset");
372
373 write_and_clear_changes(changes_fd, buf, buf.size());
374 buf = F_pack_uint(reqrev);
375 RETURN(buf);
376 }
377
378 string
get_uuid() const379 FlintDatabaseReplicator::get_uuid() const
380 {
381 LOGCALL(DB, string, "FlintDatabaseReplicator::get_uuid", NO_ARGS);
382 FlintVersion version_file(db_dir);
383 try {
384 version_file.read_and_check(true);
385 } catch (const DatabaseError &) {
386 RETURN(string());
387 }
388 RETURN(version_file.get_uuid_string());
389 }
390