1 #include "gossip_store.h"
2 
3 #include <ccan/array_size/array_size.h>
4 #include <ccan/crc32c/crc32c.h>
5 #include <ccan/noerr/noerr.h>
6 #include <ccan/read_write_all/read_write_all.h>
7 #include <ccan/tal/str/str.h>
8 #include <common/gossip_store.h>
9 #include <common/private_channel_announcement.h>
10 #include <common/status.h>
11 #include <errno.h>
12 #include <fcntl.h>
13 #include <gossipd/gossip_store_wiregen.h>
14 #include <sys/stat.h>
15 #include <sys/uio.h>
16 #include <unistd.h>
17 #include <wire/peer_wire.h>
18 
19 #define GOSSIP_STORE_TEMP_FILENAME "gossip_store.tmp"
20 
21 struct gossip_store {
22 	/* This is false when we're loading */
23 	bool writable;
24 
25 	int fd;
26 	u8 version;
27 
28 	/* Offset of current EOF */
29 	u64 len;
30 
31 	/* Counters for entries in the gossip_store entries. This is used to
32 	 * decide whether we should rewrite the on-disk store or not.
33 	 * Note: count includes deleted. */
34 	size_t count, deleted;
35 
36 	/* Handle to the routing_state to retrieve additional information,
37 	 * should it be needed */
38 	struct routing_state *rstate;
39 
40 	/* This is daemon->peers for handling to update_peers_broadcast_index */
41 	struct list_head *peers;
42 
43 	/* Disable compaction if we encounter an error during a prior
44 	 * compaction */
45 	bool disable_compaction;
46 
47 	/* Timestamp of store when we opened it (0 if we created it) */
48 	u32 timestamp;
49 };
50 
gossip_store_destroy(struct gossip_store * gs)51 static void gossip_store_destroy(struct gossip_store *gs)
52 {
53 	close(gs->fd);
54 }
55 
56 #if HAVE_PWRITEV
57 /* One fewer syscall for the win! */
gossip_pwritev(int fd,const struct iovec * iov,int iovcnt,off_t offset)58 static ssize_t gossip_pwritev(int fd, const struct iovec *iov, int iovcnt,
59 			      off_t offset)
60 {
61 	return pwritev(fd, iov, iovcnt, offset);
62 }
63 #else /* Hello MacOS! */
gossip_pwritev(int fd,const struct iovec * iov,int iovcnt,off_t offset)64 static ssize_t gossip_pwritev(int fd, const struct iovec *iov, int iovcnt,
65 			      off_t offset)
66 {
67 	if (lseek(fd, offset, SEEK_SET) != offset)
68 		return -1;
69 	return writev(fd, iov, iovcnt);
70 }
71 #endif /* !HAVE_PWRITEV */
72 
append_msg(int fd,const u8 * msg,u32 timestamp,bool push,u64 * len)73 static bool append_msg(int fd, const u8 *msg, u32 timestamp,
74 		       bool push, u64 *len)
75 {
76 	struct gossip_hdr hdr;
77 	u32 msglen;
78 	struct iovec iov[2];
79 
80 	/* Don't ever overwrite the version header! */
81 	assert(*len);
82 
83 	msglen = tal_count(msg);
84 	hdr.len = cpu_to_be32(msglen);
85 	if (push)
86 		hdr.len |= CPU_TO_BE32(GOSSIP_STORE_LEN_PUSH_BIT);
87 	hdr.crc = cpu_to_be32(crc32c(timestamp, msg, msglen));
88 	hdr.timestamp = cpu_to_be32(timestamp);
89 
90 	/* pwritev makes it more likely to appear at once, plus it's
91 	 * exactly what we want. */
92 	iov[0].iov_base = &hdr;
93 	iov[0].iov_len = sizeof(hdr);
94 	iov[1].iov_base = (void *)msg;
95 	iov[1].iov_len = msglen;
96 	if (gossip_pwritev(fd, iov, ARRAY_SIZE(iov), *len) != sizeof(hdr) + msglen)
97 		return false;
98 	*len += sizeof(hdr) + msglen;
99 	return true;
100 }
101 
102 #ifdef COMPAT_V082
mk_private_channelmsg(const tal_t * ctx,struct routing_state * rstate,const struct short_channel_id * scid,const struct node_id * remote_node_id,struct amount_sat sat,const u8 * features)103 static u8 *mk_private_channelmsg(const tal_t *ctx,
104 				 struct routing_state *rstate,
105 				 const struct short_channel_id *scid,
106 				 const struct node_id *remote_node_id,
107 				 struct amount_sat sat,
108 				 const u8 *features)
109 {
110 	const u8 *ann = private_channel_announcement(tmpctx, scid,
111 						     &rstate->local_id,
112 						     remote_node_id,
113 						     features);
114 
115 	return towire_gossip_store_private_channel(ctx, sat, ann);
116 }
117 
118 /* The upgrade from version 7 is trivial */
can_upgrade(u8 oldversion)119 static bool can_upgrade(u8 oldversion)
120 {
121 	return oldversion == 7 || oldversion == 8;
122 }
123 
upgrade_field(u8 oldversion,struct routing_state * rstate,u8 ** msg)124 static bool upgrade_field(u8 oldversion,
125 			  struct routing_state *rstate,
126 			  u8 **msg)
127 {
128 	assert(can_upgrade(oldversion));
129 
130 	if (fromwire_peektype(*msg) == WIRE_GOSSIPD_LOCAL_ADD_CHANNEL_OBS
131 	    && oldversion == 7) {
132 		/* Append two 0 bytes, for (empty) feature bits */
133 		tal_resizez(msg, tal_bytelen(*msg) + 2);
134 	}
135 
136 	/* We turn these (v8) into a WIRE_GOSSIP_STORE_PRIVATE_CHANNEL */
137 	if (fromwire_peektype(*msg) == WIRE_GOSSIPD_LOCAL_ADD_CHANNEL_OBS) {
138 		struct short_channel_id scid;
139 		struct node_id remote_node_id;
140 		struct amount_sat satoshis;
141 		u8 *features;
142 		u8 *storemsg;
143 
144 		if (!fromwire_gossipd_local_add_channel_obs(tmpctx, *msg,
145 							&scid,
146 							&remote_node_id,
147 							&satoshis,
148 							&features))
149 			return false;
150 
151 		storemsg = mk_private_channelmsg(tal_parent(*msg),
152 						 rstate,
153 						 &scid,
154 						 &remote_node_id,
155 						 satoshis,
156 						 features);
157 		tal_free(*msg);
158 		*msg = storemsg;
159 	}
160 	return true;
161 }
162 #else
can_upgrade(u8 oldversion)163 static bool can_upgrade(u8 oldversion)
164 {
165 	return false;
166 }
167 
upgrade_field(u8 oldversion,struct routing_state * rstate,u8 ** msg)168 static bool upgrade_field(u8 oldversion,
169 			  struct routing_state *rstate,
170 			  u8 **msg)
171 {
172 	abort();
173 }
174 #endif /* !COMPAT_V082 */
175 
176 /* Read gossip store entries, copy non-deleted ones.  This code is written
177  * as simply and robustly as possible! */
gossip_store_compact_offline(struct routing_state * rstate)178 static u32 gossip_store_compact_offline(struct routing_state *rstate)
179 {
180 	size_t count = 0, deleted = 0;
181 	int old_fd, new_fd;
182 	u64 oldlen, newlen;
183 	struct gossip_hdr hdr;
184 	u8 oldversion, version = GOSSIP_STORE_VERSION;
185 	struct stat st;
186 
187 	old_fd = open(GOSSIP_STORE_FILENAME, O_RDWR);
188 	if (old_fd == -1)
189 		return 0;
190 
191 	if (fstat(old_fd, &st) != 0) {
192 		status_broken("Could not stat gossip_store: %s",
193 			      strerror(errno));
194 		goto close_old;
195 	}
196 
197 	new_fd = open(GOSSIP_STORE_TEMP_FILENAME, O_RDWR|O_TRUNC|O_CREAT, 0600);
198 	if (new_fd < 0) {
199 		status_broken(
200 		    "Could not open file for gossip_store compaction");
201 		goto close_old;
202 	}
203 
204 	if (!read_all(old_fd, &oldversion, sizeof(oldversion))
205 	    || (oldversion != version && !can_upgrade(oldversion))) {
206 		status_broken("gossip_store_compact: bad version");
207 		goto close_and_delete;
208 	}
209 
210 	if (!write_all(new_fd, &version, sizeof(version))) {
211 		status_broken("gossip_store_compact_offline: writing version to store: %s",
212 			      strerror(errno));
213 		goto close_and_delete;
214 	}
215 
216 	/* Read everything, write non-deleted ones to new_fd */
217 	while (read_all(old_fd, &hdr, sizeof(hdr))) {
218 		size_t msglen;
219 		u8 *msg;
220 
221 		msglen = (be32_to_cpu(hdr.len) & GOSSIP_STORE_LEN_MASK);
222 		msg = tal_arr(NULL, u8, msglen);
223 		if (!read_all(old_fd, msg, msglen)) {
224 			status_broken("gossip_store_compact_offline: reading msg len %zu from store: %s",
225 				      msglen, strerror(errno));
226 			tal_free(msg);
227 			goto close_and_delete;
228 		}
229 
230 		if (be32_to_cpu(hdr.len) & GOSSIP_STORE_LEN_DELETED_BIT) {
231 			deleted++;
232 			tal_free(msg);
233 			continue;
234 		}
235 
236 		if (oldversion != version) {
237 			if (!upgrade_field(oldversion, rstate, &msg)) {
238 				tal_free(msg);
239 				goto close_and_delete;
240 			}
241 
242 			/* Recalc msglen and header */
243 			msglen = tal_bytelen(msg);
244 			hdr.len = cpu_to_be32(msglen);
245 			hdr.crc = cpu_to_be32(crc32c(be32_to_cpu(hdr.timestamp),
246 						      msg, msglen));
247 		}
248 
249 		/* Don't write out old tombstones */
250 		if (fromwire_peektype(msg) == WIRE_GOSSIP_STORE_DELETE_CHAN) {
251 			deleted++;
252 			tal_free(msg);
253 			continue;
254 		}
255 
256 		if (!write_all(new_fd, &hdr, sizeof(hdr))
257 		    || !write_all(new_fd, msg, msglen)) {
258 			status_broken("gossip_store_compact_offline: writing msg len %zu to new store: %s",
259 				      msglen, strerror(errno));
260 			tal_free(msg);
261 			goto close_and_delete;
262 		}
263 		tal_free(msg);
264 		count++;
265 	}
266 	if (close(new_fd) != 0) {
267 		status_broken("gossip_store_compact_offline: closing new store: %s",
268 			      strerror(errno));
269 		goto close_old;
270 	}
271 	if (rename(GOSSIP_STORE_TEMP_FILENAME, GOSSIP_STORE_FILENAME) != 0) {
272 		status_broken("gossip_store_compact_offline: rename failed: %s",
273 			      strerror(errno));
274 	}
275 
276 	/* Create end marker now new file exists. */
277 	oldlen = lseek(old_fd, SEEK_END, 0);
278 	newlen = lseek(new_fd, SEEK_END, 0);
279 	append_msg(old_fd, towire_gossip_store_ended(tmpctx, newlen),
280 		   0, false, &oldlen);
281 	close(old_fd);
282 	status_debug("gossip_store_compact_offline: %zu deleted, %zu copied",
283 		     deleted, count);
284 	return st.st_mtime;
285 
286 close_and_delete:
287 	close(new_fd);
288 close_old:
289 	close(old_fd);
290 	unlink(GOSSIP_STORE_TEMP_FILENAME);
291 	return 0;
292 }
293 
gossip_store_new(struct routing_state * rstate,struct list_head * peers)294 struct gossip_store *gossip_store_new(struct routing_state *rstate,
295 				      struct list_head *peers)
296 {
297 	struct gossip_store *gs = tal(rstate, struct gossip_store);
298 	gs->count = gs->deleted = 0;
299 	gs->writable = true;
300 	gs->timestamp = gossip_store_compact_offline(rstate);
301 	gs->fd = open(GOSSIP_STORE_FILENAME, O_RDWR|O_CREAT, 0600);
302 	if (gs->fd < 0)
303 		status_failed(STATUS_FAIL_INTERNAL_ERROR,
304 			      "Opening gossip_store store: %s",
305 			      strerror(errno));
306 	gs->rstate = rstate;
307 	gs->disable_compaction = false;
308 	gs->len = sizeof(gs->version);
309 	gs->peers = peers;
310 
311 	tal_add_destructor(gs, gossip_store_destroy);
312 
313 	/* Try to read the version, write it if this is a new file, or truncate
314 	 * if the version doesn't match */
315 	if (read(gs->fd, &gs->version, sizeof(gs->version))
316 	    == sizeof(gs->version)) {
317 		/* Version match?  All good */
318 		if (gs->version == GOSSIP_STORE_VERSION)
319 			return gs;
320 
321 		status_unusual("Gossip store version %u not %u: removing",
322 			       gs->version, GOSSIP_STORE_VERSION);
323 		if (ftruncate(gs->fd, 0) != 0)
324 			status_failed(STATUS_FAIL_INTERNAL_ERROR,
325 				      "Truncating store: %s", strerror(errno));
326 		/* Subtle: we are at offset 1, move back to start! */
327 		if (lseek(gs->fd, 0, SEEK_SET) != 0)
328 			status_failed(STATUS_FAIL_INTERNAL_ERROR,
329 				      "Seeking to start of store: %s",
330 				      strerror(errno));
331 	}
332 	/* Empty file, write version byte */
333 	gs->version = GOSSIP_STORE_VERSION;
334 	if (write(gs->fd, &gs->version, sizeof(gs->version))
335 	    != sizeof(gs->version))
336 		status_failed(STATUS_FAIL_INTERNAL_ERROR,
337 			      "Writing version to store: %s", strerror(errno));
338 	return gs;
339 }
340 
341 /* Returns bytes transferred, or 0 on error */
transfer_store_msg(int from_fd,size_t from_off,int to_fd,size_t to_off,int * type)342 static size_t transfer_store_msg(int from_fd, size_t from_off,
343 				 int to_fd, size_t to_off,
344 				 int *type)
345 {
346 	struct gossip_hdr hdr;
347 	u32 msglen;
348 	u8 *msg;
349 	const u8 *p;
350 	size_t tmplen;
351 
352 	*type = -1;
353 	if (pread(from_fd, &hdr, sizeof(hdr), from_off) != sizeof(hdr)) {
354 		status_broken("Failed reading header from to gossip store @%zu"
355 			      ": %s",
356 			      from_off, strerror(errno));
357 		return 0;
358 	}
359 
360 	msglen = be32_to_cpu(hdr.len);
361 	if (msglen & GOSSIP_STORE_LEN_DELETED_BIT) {
362 		status_broken("Can't transfer deleted msg from gossip store @%zu",
363 			      from_off);
364 		return 0;
365 	}
366 
367 	/* Ignore any non-length bits (e.g. push) */
368 	msglen &= GOSSIP_STORE_LEN_MASK;
369 
370 	/* FIXME: Reuse buffer? */
371 	msg = tal_arr(tmpctx, u8, sizeof(hdr) + msglen);
372 	memcpy(msg, &hdr, sizeof(hdr));
373 	if (pread(from_fd, msg + sizeof(hdr), msglen, from_off + sizeof(hdr))
374 	    != msglen) {
375 		status_broken("Failed reading %u from to gossip store @%zu"
376 			      ": %s",
377 			      msglen, from_off, strerror(errno));
378 		return 0;
379 	}
380 
381 	if (pwrite(to_fd, msg, msglen + sizeof(hdr), to_off)
382 	    != msglen + sizeof(hdr)) {
383 		status_broken("Failed writing to gossip store: %s",
384 			      strerror(errno));
385 		return 0;
386 	}
387 
388 	/* Can't use peektype here, since we have header on front */
389 	p = msg + sizeof(hdr);
390 	tmplen = msglen;
391 	*type = fromwire_u16(&p, &tmplen);
392 	if (!p)
393 		*type = -1;
394 	tal_free(msg);
395 	return sizeof(hdr) + msglen;
396 }
397 
398 /* We keep a htable map of old gossip_store offsets to new ones. */
399 struct offset_map {
400 	size_t from, to;
401 };
402 
offset_map_key(const struct offset_map * omap)403 static size_t offset_map_key(const struct offset_map *omap)
404 {
405 	return omap->from;
406 }
407 
hash_offset(size_t from)408 static size_t hash_offset(size_t from)
409 {
410 	/* Crappy fast hash is "good enough" */
411 	return (from >> 5) ^ from;
412 }
413 
offset_map_eq(const struct offset_map * omap,const size_t from)414 static bool offset_map_eq(const struct offset_map *omap, const size_t from)
415 {
416 	return omap->from == from;
417 }
418 HTABLE_DEFINE_TYPE(struct offset_map,
419 		   offset_map_key, hash_offset, offset_map_eq, offmap);
420 
move_broadcast(struct offmap * offmap,struct broadcastable * bcast,const char * what)421 static void move_broadcast(struct offmap *offmap,
422 			   struct broadcastable *bcast,
423 			   const char *what)
424 {
425 	struct offset_map *omap;
426 
427 	if (!bcast->index)
428 		return;
429 
430 	omap = offmap_get(offmap, bcast->index);
431 	if (!omap)
432 		status_failed(STATUS_FAIL_INTERNAL_ERROR,
433 			      "Could not relocate %s at offset %u",
434 			      what, bcast->index);
435 	bcast->index = omap->to;
436 	offmap_del(offmap, omap);
437 }
438 
destroy_offmap(struct offmap * offmap)439 static void destroy_offmap(struct offmap *offmap)
440 {
441 	offmap_clear(offmap);
442 }
443 
444 /**
445  * Rewrite the on-disk gossip store, compacting it along the way
446  *
447  * Creates a new file, writes all the updates from the `broadcast_state`, and
448  * then atomically swaps the files.
449  */
gossip_store_compact(struct gossip_store * gs)450 bool gossip_store_compact(struct gossip_store *gs)
451 {
452 	size_t count = 0, deleted = 0;
453 	int fd;
454 	u64 off, len = sizeof(gs->version), idx;
455 	struct offmap *offmap;
456 	struct gossip_hdr hdr;
457 	struct offmap_iter oit;
458 	struct node_map_iter nit;
459 	struct offset_map *omap;
460 
461 	if (gs->disable_compaction)
462 		return false;
463 
464 	status_debug(
465 	    "Compacting gossip_store with %zu entries, %zu of which are stale",
466 	    gs->count, gs->deleted);
467 
468 	fd = open(GOSSIP_STORE_TEMP_FILENAME, O_RDWR|O_TRUNC|O_CREAT, 0600);
469 
470 	if (fd < 0) {
471 		status_broken(
472 		    "Could not open file for gossip_store compaction");
473 		goto disable;
474 	}
475 
476 	if (write(fd, &gs->version, sizeof(gs->version))
477 	    != sizeof(gs->version)) {
478 		status_broken("Writing version to store: %s", strerror(errno));
479 		goto unlink_disable;
480 	}
481 
482 	/* Walk old file, copy everything and remember new offsets. */
483 	offmap = tal(tmpctx, struct offmap);
484 	offmap_init_sized(offmap, gs->count);
485 	tal_add_destructor(offmap, destroy_offmap);
486 
487 	/* Start by writing all channel announcements and updates. */
488 	off = 1;
489 	while (pread(gs->fd, &hdr, sizeof(hdr), off) == sizeof(hdr)) {
490 		u32 msglen, wlen;
491 		int msgtype;
492 
493 		msglen = (be32_to_cpu(hdr.len) & GOSSIP_STORE_LEN_MASK);
494 		if (be32_to_cpu(hdr.len) & GOSSIP_STORE_LEN_DELETED_BIT) {
495 			off += sizeof(hdr) + msglen;
496 			deleted++;
497 			continue;
498 		}
499 
500 		count++;
501 		wlen = transfer_store_msg(gs->fd, off, fd, len, &msgtype);
502 		if (wlen == 0)
503 			goto unlink_disable;
504 
505 		/* We track location of all these message types. */
506 		if (msgtype == WIRE_GOSSIP_STORE_PRIVATE_CHANNEL
507 		    || msgtype == WIRE_GOSSIP_STORE_PRIVATE_UPDATE
508 		    || msgtype == WIRE_CHANNEL_ANNOUNCEMENT
509 		    || msgtype == WIRE_CHANNEL_UPDATE
510 		    || msgtype == WIRE_NODE_ANNOUNCEMENT) {
511 			omap = tal(offmap, struct offset_map);
512 			omap->from = off;
513 			omap->to = len;
514 			offmap_add(offmap, omap);
515 		}
516 		len += wlen;
517 		off += wlen;
518 	}
519 
520 	/* OK, now we've written file successfully, we can move broadcasts. */
521 	/* Remap node announcements. */
522 	for (struct node *n = node_map_first(gs->rstate->nodes, &nit);
523 	     n;
524 	     n = node_map_next(gs->rstate->nodes, &nit)) {
525 		move_broadcast(offmap, &n->bcast, "node_announce");
526 	}
527 
528 	/* Remap channel announcements and updates */
529 	for (struct chan *c = uintmap_first(&gs->rstate->chanmap, &idx);
530 	     c;
531 	     c = uintmap_after(&gs->rstate->chanmap, &idx)) {
532 		move_broadcast(offmap, &c->bcast, "channel_announce");
533 		move_broadcast(offmap, &c->half[0].bcast, "channel_update");
534 		move_broadcast(offmap, &c->half[1].bcast, "channel_update");
535 	}
536 
537 	/* That should be everything. */
538 	omap = offmap_first(offmap, &oit);
539 	if (omap)
540 		status_failed(STATUS_FAIL_INTERNAL_ERROR,
541 			      "gossip_store: Entry at %zu->%zu not updated?",
542 			      omap->from, omap->to);
543 
544 	if (count != gs->count - gs->deleted)
545 		status_failed(STATUS_FAIL_INTERNAL_ERROR,
546 			      "gossip_store: Expected %zu msgs in new"
547 			      " gossip store, got %zu",
548 			      gs->count - gs->deleted, count);
549 
550 	if (deleted != gs->deleted)
551 		status_failed(STATUS_FAIL_INTERNAL_ERROR,
552 			      "gossip_store: Expected %zu deleted msgs in old"
553 			      " gossip store, got %zu",
554 			      gs->deleted, deleted);
555 
556 	if (rename(GOSSIP_STORE_TEMP_FILENAME, GOSSIP_STORE_FILENAME) == -1)
557 		status_failed(STATUS_FAIL_INTERNAL_ERROR,
558 			      "Error swapping compacted gossip_store into place:"
559 			      " %s",
560 			      strerror(errno));
561 
562 	status_debug(
563 	    "Compaction completed: dropped %zu messages, new count %zu, len %"PRIu64,
564 	    deleted, count, len);
565 
566 	/* Write end marker now new one is ready */
567 	append_msg(gs->fd, towire_gossip_store_ended(tmpctx, len),
568 		   0, false, &gs->len);
569 
570 	gs->count = count;
571 	gs->deleted = 0;
572 	gs->len = len;
573 	close(gs->fd);
574 	gs->fd = fd;
575 
576 	return true;
577 
578 unlink_disable:
579 	unlink(GOSSIP_STORE_TEMP_FILENAME);
580 disable:
581 	status_debug("Encountered an error while compacting, disabling "
582 		     "future compactions.");
583 	gs->disable_compaction = true;
584 	return false;
585 }
586 
gossip_store_add(struct gossip_store * gs,const u8 * gossip_msg,u32 timestamp,bool push,const u8 * addendum)587 u64 gossip_store_add(struct gossip_store *gs, const u8 *gossip_msg,
588 		     u32 timestamp, bool push,
589 		     const u8 *addendum)
590 {
591 	u64 off = gs->len;
592 
593 	/* Should never get here during loading! */
594 	assert(gs->writable);
595 
596 	if (!append_msg(gs->fd, gossip_msg, timestamp, push, &gs->len)) {
597 		status_broken("Failed writing to gossip store: %s",
598 			      strerror(errno));
599 		return 0;
600 	}
601 	if (addendum && !append_msg(gs->fd, addendum, 0, false, &gs->len)) {
602 		status_broken("Failed writing addendum to gossip store: %s",
603 			      strerror(errno));
604 		return 0;
605 	}
606 
607 	gs->count++;
608 	if (addendum)
609 		gs->count++;
610 	return off;
611 }
612 
gossip_store_add_private_update(struct gossip_store * gs,const u8 * update)613 u64 gossip_store_add_private_update(struct gossip_store *gs, const u8 *update)
614 {
615 	/* A local update for an unannounced channel: not broadcastable, but
616 	 * otherwise the same as a normal channel_update */
617 	const u8 *pupdate = towire_gossip_store_private_update(tmpctx, update);
618 	return gossip_store_add(gs, pupdate, 0, false, NULL);
619 }
620 
621 /* Returns index of following entry. */
delete_by_index(struct gossip_store * gs,u32 index,int type)622 static u32 delete_by_index(struct gossip_store *gs, u32 index, int type)
623 {
624 	beint32_t belen;
625 
626 	/* Should never get here during loading! */
627 	assert(gs->writable);
628 
629 	/* Should never try to overwrite version */
630 	assert(index);
631 
632 #if DEVELOPER
633 	const u8 *msg = gossip_store_get(tmpctx, gs, index);
634 	assert(fromwire_peektype(msg) == type);
635 #endif
636 
637 	if (pread(gs->fd, &belen, sizeof(belen), index) != sizeof(belen))
638 		status_failed(STATUS_FAIL_INTERNAL_ERROR,
639 			      "Failed reading len to delete @%u: %s",
640 			      index, strerror(errno));
641 
642 	assert((be32_to_cpu(belen) & GOSSIP_STORE_LEN_DELETED_BIT) == 0);
643 	belen |= cpu_to_be32(GOSSIP_STORE_LEN_DELETED_BIT);
644 	if (pwrite(gs->fd, &belen, sizeof(belen), index) != sizeof(belen))
645 		status_failed(STATUS_FAIL_INTERNAL_ERROR,
646 			      "Failed writing len to delete @%u: %s",
647 			      index, strerror(errno));
648 	gs->deleted++;
649 
650 	return index + sizeof(struct gossip_hdr)
651 		+ (be32_to_cpu(belen) & GOSSIP_STORE_LEN_MASK);
652 }
653 
gossip_store_delete(struct gossip_store * gs,struct broadcastable * bcast,int type)654 void gossip_store_delete(struct gossip_store *gs,
655 			 struct broadcastable *bcast,
656 			 int type)
657 {
658 	u32 next_index;
659 
660 	if (!bcast->index)
661 		return;
662 
663 	next_index = delete_by_index(gs, bcast->index, type);
664 
665 	/* Reset index. */
666 	bcast->index = 0;
667 
668 	/* For a channel_announcement, we need to delete amount too */
669 	if (type == WIRE_CHANNEL_ANNOUNCEMENT)
670 		delete_by_index(gs, next_index,
671 				WIRE_GOSSIP_STORE_CHANNEL_AMOUNT);
672 }
673 
gossip_store_mark_channel_deleted(struct gossip_store * gs,const struct short_channel_id * scid)674 void gossip_store_mark_channel_deleted(struct gossip_store *gs,
675 				       const struct short_channel_id *scid)
676 {
677 	gossip_store_add(gs, towire_gossip_store_delete_chan(tmpctx, scid),
678 			 0, false, NULL);
679 }
680 
gossip_store_get(const tal_t * ctx,struct gossip_store * gs,u64 offset)681 const u8 *gossip_store_get(const tal_t *ctx,
682 			   struct gossip_store *gs,
683 			   u64 offset)
684 {
685 	struct gossip_hdr hdr;
686 	u32 msglen, checksum;
687 	u8 *msg;
688 
689 	if (offset == 0)
690 		status_failed(STATUS_FAIL_INTERNAL_ERROR,
691 			      "gossip_store: can't access offset %"PRIu64,
692 			      offset);
693 	if (pread(gs->fd, &hdr, sizeof(hdr), offset) != sizeof(hdr)) {
694 		status_failed(STATUS_FAIL_INTERNAL_ERROR,
695 			      "gossip_store: can't read hdr offset %"PRIu64
696 			      "/%"PRIu64": %s",
697 			      offset, gs->len, strerror(errno));
698 	}
699 
700 	if (be32_to_cpu(hdr.len) & GOSSIP_STORE_LEN_DELETED_BIT)
701 		status_failed(STATUS_FAIL_INTERNAL_ERROR,
702 			      "gossip_store: get delete entry offset %"PRIu64
703 			      "/%"PRIu64"",
704 			      offset, gs->len);
705 
706 	msglen = (be32_to_cpu(hdr.len) & GOSSIP_STORE_LEN_MASK);
707 	checksum = be32_to_cpu(hdr.crc);
708 	msg = tal_arr(ctx, u8, msglen);
709 	if (pread(gs->fd, msg, msglen, offset + sizeof(hdr)) != msglen)
710 		status_failed(STATUS_FAIL_INTERNAL_ERROR,
711 			      "gossip_store: can't read len %u offset %"PRIu64
712 			      "/%"PRIu64, msglen, offset, gs->len);
713 
714 	if (checksum != crc32c(be32_to_cpu(hdr.timestamp), msg, msglen))
715 		status_failed(STATUS_FAIL_INTERNAL_ERROR,
716 			      "gossip_store: bad checksum offset %"PRIu64": %s",
717 			      offset, tal_hex(tmpctx, msg));
718 
719 	return msg;
720 }
721 
gossip_store_get_private_update(const tal_t * ctx,struct gossip_store * gs,u64 offset)722 const u8 *gossip_store_get_private_update(const tal_t *ctx,
723 					  struct gossip_store *gs,
724 					  u64 offset)
725 {
726 	const u8 *pmsg = gossip_store_get(tmpctx, gs, offset);
727 	u8 *msg;
728 
729 	if (!fromwire_gossip_store_private_update(ctx, pmsg, &msg))
730 		status_failed(STATUS_FAIL_INTERNAL_ERROR,
731 			      "Failed to decode private update @%"PRIu64": %s",
732 			      offset, tal_hex(tmpctx, pmsg));
733 	return msg;
734 }
735 
gossip_store_readonly_fd(struct gossip_store * gs)736 int gossip_store_readonly_fd(struct gossip_store *gs)
737 {
738 	int fd = open(GOSSIP_STORE_FILENAME, O_RDONLY);
739 
740 	/* Skip over version header */
741 	if (fd != -1 && lseek(fd, 1, SEEK_SET) != 1) {
742 		close_noerr(fd);
743 		fd = -1;
744 	}
745 	return fd;
746 }
747 
gossip_store_load(struct routing_state * rstate,struct gossip_store * gs)748 u32 gossip_store_load(struct routing_state *rstate, struct gossip_store *gs)
749 {
750 	struct gossip_hdr hdr;
751 	u32 msglen, checksum;
752 	u8 *msg;
753 	struct amount_sat satoshis;
754 	const char *bad;
755 	size_t stats[] = {0, 0, 0, 0};
756 	struct timeabs start = time_now();
757 	u8 *chan_ann = NULL;
758 	u64 chan_ann_off = 0; /* Spurious gcc-9 (Ubuntu 9-20190402-1ubuntu1) 9.0.1 20190402 (experimental) warning */
759 
760 	gs->writable = false;
761 	while (pread(gs->fd, &hdr, sizeof(hdr), gs->len) == sizeof(hdr)) {
762 		msglen = be32_to_cpu(hdr.len) & GOSSIP_STORE_LEN_MASK;
763 		checksum = be32_to_cpu(hdr.crc);
764 		msg = tal_arr(tmpctx, u8, msglen);
765 
766 		if (pread(gs->fd, msg, msglen, gs->len+sizeof(hdr)) != msglen) {
767 			bad = "gossip_store: truncated file?";
768 			goto corrupt;
769 		}
770 
771 		if (checksum != crc32c(be32_to_cpu(hdr.timestamp), msg, msglen)) {
772 			bad = "Checksum verification failed";
773 			goto badmsg;
774 		}
775 
776 		/* Skip deleted entries */
777 		if (be32_to_cpu(hdr.len) & GOSSIP_STORE_LEN_DELETED_BIT) {
778 			/* Count includes deleted! */
779 			gs->count++;
780 			gs->deleted++;
781 			goto next;
782 		}
783 
784 		switch (fromwire_peektype(msg)) {
785 		case WIRE_GOSSIP_STORE_PRIVATE_CHANNEL:
786 			if (!routing_add_private_channel(rstate, NULL, msg,
787 							 gs->len)) {
788 				bad = "Bad add_private_channel";
789 				goto badmsg;
790 			}
791 			stats[0]++;
792 			break;
793 		case WIRE_GOSSIP_STORE_CHANNEL_AMOUNT:
794 			if (!fromwire_gossip_store_channel_amount(msg,
795 								  &satoshis)) {
796 				bad = "Bad gossip_store_channel_amount";
797 				goto badmsg;
798 			}
799 			/* Previous channel_announcement may have been deleted */
800 			if (!chan_ann)
801 				break;
802 			if (!routing_add_channel_announcement(rstate,
803 							      take(chan_ann),
804 							      satoshis,
805 							      chan_ann_off,
806 							      NULL)) {
807 				bad = "Bad channel_announcement";
808 				goto badmsg;
809 			}
810 			chan_ann = NULL;
811 			stats[0]++;
812 			break;
813 		case WIRE_CHANNEL_ANNOUNCEMENT:
814 			if (chan_ann) {
815 				bad = "channel_announcement without amount";
816 				goto badmsg;
817 			}
818 			/* Save for channel_amount (next msg) */
819 			chan_ann = tal_steal(gs, msg);
820 			chan_ann_off = gs->len;
821 			break;
822 		case WIRE_GOSSIP_STORE_PRIVATE_UPDATE:
823 			if (!fromwire_gossip_store_private_update(tmpctx, msg, &msg)) {
824 				bad = "invalid gossip_store_private_update";
825 				goto badmsg;
826 			}
827 			/* fall thru */
828 		case WIRE_CHANNEL_UPDATE:
829 			if (!routing_add_channel_update(rstate,
830 							take(msg), gs->len,
831 							NULL, false)) {
832 				bad = "Bad channel_update";
833 				goto badmsg;
834 			}
835 			stats[1]++;
836 			break;
837 		case WIRE_NODE_ANNOUNCEMENT:
838 			if (!routing_add_node_announcement(rstate,
839 							   take(msg), gs->len,
840 							   NULL, NULL)) {
841 				bad = "Bad node_announcement";
842 				goto badmsg;
843 			}
844 			stats[2]++;
845 			break;
846 		default:
847 			bad = "Unknown message";
848 			goto badmsg;
849 		}
850 
851 		gs->count++;
852 	next:
853 		gs->len += sizeof(hdr) + msglen;
854 		clean_tmpctx();
855 	}
856 
857 	if (chan_ann) {
858 		bad = "dangling channel_announcement";
859 		goto corrupt;
860 	}
861 
862 	bad = unfinalized_entries(tmpctx, rstate);
863 	if (bad)
864 		goto corrupt;
865 
866 	goto out;
867 
868 badmsg:
869 	bad = tal_fmt(tmpctx, "%s (%s)", bad, tal_hex(tmpctx, msg));
870 
871 corrupt:
872 	status_broken("gossip_store: %s. Moving to %s.corrupt and truncating",
873 		      bad, GOSSIP_STORE_FILENAME);
874 
875 	/* FIXME: Debug partial truncate case. */
876 	rename(GOSSIP_STORE_FILENAME, GOSSIP_STORE_FILENAME ".corrupt");
877 	close(gs->fd);
878 	gs->fd = open(GOSSIP_STORE_FILENAME, O_RDWR|O_TRUNC|O_CREAT, 0600);
879 	if (gs->fd < 0 || !write_all(gs->fd, &gs->version, sizeof(gs->version)))
880 		status_failed(STATUS_FAIL_INTERNAL_ERROR,
881 			      "Truncating new store file: %s", strerror(errno));
882 	remove_all_gossip(rstate);
883 	gs->count = gs->deleted = 0;
884 	gs->len = 1;
885 	gs->timestamp = 0;
886 out:
887 	gs->writable = true;
888 	status_debug("total store load time: %"PRIu64" msec",
889 		     time_to_msec(time_between(time_now(), start)));
890 	status_debug("gossip_store: Read %zu/%zu/%zu/%zu cannounce/cupdate/nannounce/cdelete from store (%zu deleted) in %"PRIu64" bytes",
891 		     stats[0], stats[1], stats[2], stats[3], gs->deleted,
892 		     gs->len);
893 
894 	return gs->timestamp;
895 }
896