1 /*-------------------------------------------------------------------------
2  *
3  * pgoutput.c
4  *		Logical Replication output plugin
5  *
6  * Copyright (c) 2012-2018, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *		  src/backend/replication/pgoutput/pgoutput.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "catalog/pg_publication.h"
16 
17 #include "replication/logical.h"
18 #include "replication/logicalproto.h"
19 #include "replication/origin.h"
20 #include "replication/pgoutput.h"
21 
22 #include "utils/inval.h"
23 #include "utils/int8.h"
24 #include "utils/memutils.h"
25 #include "utils/syscache.h"
26 #include "utils/varlena.h"
27 
28 PG_MODULE_MAGIC;
29 
30 extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
31 
32 static void pgoutput_startup(LogicalDecodingContext *ctx,
33 				 OutputPluginOptions *opt, bool is_init);
34 static void pgoutput_shutdown(LogicalDecodingContext *ctx);
35 static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
36 				   ReorderBufferTXN *txn);
37 static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
38 					ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
39 static void pgoutput_change(LogicalDecodingContext *ctx,
40 				ReorderBufferTXN *txn, Relation rel,
41 				ReorderBufferChange *change);
42 static void pgoutput_truncate(LogicalDecodingContext *ctx,
43 				  ReorderBufferTXN *txn, int nrelations, Relation relations[],
44 				  ReorderBufferChange *change);
45 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
46 					   RepOriginId origin_id);
47 
48 static bool publications_valid;
49 
50 static List *LoadPublications(List *pubnames);
51 static void publication_invalidation_cb(Datum arg, int cacheid,
52 							uint32 hashvalue);
53 
54 /* Entry in the map used to remember which relation schemas we sent. */
55 typedef struct RelationSyncEntry
56 {
57 	Oid			relid;			/* relation oid */
58 	bool		schema_sent;	/* did we send the schema? */
59 	bool		replicate_valid;
60 	PublicationActions pubactions;
61 } RelationSyncEntry;
62 
63 /* Map used to remember which relation schemas we sent. */
64 static HTAB *RelationSyncCache = NULL;
65 
66 static void init_rel_sync_cache(MemoryContext decoding_context);
67 static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
68 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
69 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
70 							  uint32 hashvalue);
71 
72 /*
73  * Specify output plugin callbacks
74  */
75 void
_PG_output_plugin_init(OutputPluginCallbacks * cb)76 _PG_output_plugin_init(OutputPluginCallbacks *cb)
77 {
78 	AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
79 
80 	cb->startup_cb = pgoutput_startup;
81 	cb->begin_cb = pgoutput_begin_txn;
82 	cb->change_cb = pgoutput_change;
83 	cb->truncate_cb = pgoutput_truncate;
84 	cb->commit_cb = pgoutput_commit_txn;
85 	cb->filter_by_origin_cb = pgoutput_origin_filter;
86 	cb->shutdown_cb = pgoutput_shutdown;
87 }
88 
89 static void
parse_output_parameters(List * options,uint32 * protocol_version,List ** publication_names)90 parse_output_parameters(List *options, uint32 *protocol_version,
91 						List **publication_names)
92 {
93 	ListCell   *lc;
94 	bool		protocol_version_given = false;
95 	bool		publication_names_given = false;
96 
97 	foreach(lc, options)
98 	{
99 		DefElem    *defel = (DefElem *) lfirst(lc);
100 
101 		Assert(defel->arg == NULL || IsA(defel->arg, String));
102 
103 		/* Check each param, whether or not we recognize it */
104 		if (strcmp(defel->defname, "proto_version") == 0)
105 		{
106 			int64		parsed;
107 
108 			if (protocol_version_given)
109 				ereport(ERROR,
110 						(errcode(ERRCODE_SYNTAX_ERROR),
111 						 errmsg("conflicting or redundant options")));
112 			protocol_version_given = true;
113 
114 			if (!scanint8(strVal(defel->arg), true, &parsed))
115 				ereport(ERROR,
116 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
117 						 errmsg("invalid proto_version")));
118 
119 			if (parsed > PG_UINT32_MAX || parsed < 0)
120 				ereport(ERROR,
121 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
122 						 errmsg("proto_version \"%s\" out of range",
123 								strVal(defel->arg))));
124 
125 			*protocol_version = (uint32) parsed;
126 		}
127 		else if (strcmp(defel->defname, "publication_names") == 0)
128 		{
129 			if (publication_names_given)
130 				ereport(ERROR,
131 						(errcode(ERRCODE_SYNTAX_ERROR),
132 						 errmsg("conflicting or redundant options")));
133 			publication_names_given = true;
134 
135 			if (!SplitIdentifierString(strVal(defel->arg), ',',
136 									   publication_names))
137 				ereport(ERROR,
138 						(errcode(ERRCODE_INVALID_NAME),
139 						 errmsg("invalid publication_names syntax")));
140 		}
141 		else
142 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
143 	}
144 }
145 
146 /*
147  * Initialize this plugin
148  */
149 static void
pgoutput_startup(LogicalDecodingContext * ctx,OutputPluginOptions * opt,bool is_init)150 pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
151 				 bool is_init)
152 {
153 	PGOutputData *data = palloc0(sizeof(PGOutputData));
154 
155 	/* Create our memory context for private allocations. */
156 	data->context = AllocSetContextCreate(ctx->context,
157 										  "logical replication output context",
158 										  ALLOCSET_DEFAULT_SIZES);
159 
160 	ctx->output_plugin_private = data;
161 
162 	/* This plugin uses binary protocol. */
163 	opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
164 
165 	/*
166 	 * This is replication start and not slot initialization.
167 	 *
168 	 * Parse and validate options passed by the client.
169 	 */
170 	if (!is_init)
171 	{
172 		/* Parse the params and ERROR if we see any we don't recognize */
173 		parse_output_parameters(ctx->output_plugin_options,
174 								&data->protocol_version,
175 								&data->publication_names);
176 
177 		/* Check if we support requested protocol */
178 		if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM)
179 			ereport(ERROR,
180 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
181 					 errmsg("client sent proto_version=%d but we only support protocol %d or lower",
182 							data->protocol_version, LOGICALREP_PROTO_VERSION_NUM)));
183 
184 		if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
185 			ereport(ERROR,
186 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
187 					 errmsg("client sent proto_version=%d but we only support protocol %d or higher",
188 							data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
189 
190 		if (list_length(data->publication_names) < 1)
191 			ereport(ERROR,
192 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
193 					 errmsg("publication_names parameter missing")));
194 
195 		/* Init publication state. */
196 		data->publications = NIL;
197 		publications_valid = false;
198 		CacheRegisterSyscacheCallback(PUBLICATIONOID,
199 									  publication_invalidation_cb,
200 									  (Datum) 0);
201 
202 		/* Initialize relation schema cache. */
203 		init_rel_sync_cache(CacheMemoryContext);
204 	}
205 }
206 
207 /*
208  * BEGIN callback
209  */
210 static void
pgoutput_begin_txn(LogicalDecodingContext * ctx,ReorderBufferTXN * txn)211 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
212 {
213 	bool		send_replication_origin = txn->origin_id != InvalidRepOriginId;
214 
215 	OutputPluginPrepareWrite(ctx, !send_replication_origin);
216 	logicalrep_write_begin(ctx->out, txn);
217 
218 	if (send_replication_origin)
219 	{
220 		char	   *origin;
221 
222 		/* Message boundary */
223 		OutputPluginWrite(ctx, false);
224 		OutputPluginPrepareWrite(ctx, true);
225 
226 		/*----------
227 		 * XXX: which behaviour do we want here?
228 		 *
229 		 * Alternatives:
230 		 *	- don't send origin message if origin name not found
231 		 *	  (that's what we do now)
232 		 *	- throw error - that will break replication, not good
233 		 *	- send some special "unknown" origin
234 		 *----------
235 		 */
236 		if (replorigin_by_oid(txn->origin_id, true, &origin))
237 			logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
238 	}
239 
240 	OutputPluginWrite(ctx, true);
241 }
242 
243 /*
244  * COMMIT callback
245  */
246 static void
pgoutput_commit_txn(LogicalDecodingContext * ctx,ReorderBufferTXN * txn,XLogRecPtr commit_lsn)247 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
248 					XLogRecPtr commit_lsn)
249 {
250 	OutputPluginUpdateProgress(ctx);
251 
252 	OutputPluginPrepareWrite(ctx, true);
253 	logicalrep_write_commit(ctx->out, txn, commit_lsn);
254 	OutputPluginWrite(ctx, true);
255 }
256 
257 /*
258  * Write the relation schema if the current schema hasn't been sent yet.
259  */
260 static void
maybe_send_schema(LogicalDecodingContext * ctx,Relation relation,RelationSyncEntry * relentry)261 maybe_send_schema(LogicalDecodingContext *ctx,
262 				  Relation relation, RelationSyncEntry *relentry)
263 {
264 	if (!relentry->schema_sent)
265 	{
266 		TupleDesc	desc;
267 		int			i;
268 
269 		desc = RelationGetDescr(relation);
270 
271 		/*
272 		 * Write out type info if needed.  We do that only for user-created
273 		 * types.  We use FirstBootstrapObjectId as the cutoff, so that we only
274 		 * consider objects with hand-assigned OIDs to be "built in", not for
275 		 * instance any function or type defined in the information_schema.
276 		 * This is important because only hand-assigned OIDs can be expected
277 		 * to remain stable across major versions.
278 		 */
279 		for (i = 0; i < desc->natts; i++)
280 		{
281 			Form_pg_attribute att = TupleDescAttr(desc, i);
282 
283 			if (att->attisdropped)
284 				continue;
285 
286 			if (att->atttypid < FirstBootstrapObjectId)
287 				continue;
288 
289 			OutputPluginPrepareWrite(ctx, false);
290 			logicalrep_write_typ(ctx->out, att->atttypid);
291 			OutputPluginWrite(ctx, false);
292 		}
293 
294 		OutputPluginPrepareWrite(ctx, false);
295 		logicalrep_write_rel(ctx->out, relation);
296 		OutputPluginWrite(ctx, false);
297 		relentry->schema_sent = true;
298 	}
299 }
300 
301 /*
302  * Sends the decoded DML over wire.
303  */
304 static void
pgoutput_change(LogicalDecodingContext * ctx,ReorderBufferTXN * txn,Relation relation,ReorderBufferChange * change)305 pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
306 				Relation relation, ReorderBufferChange *change)
307 {
308 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
309 	MemoryContext old;
310 	RelationSyncEntry *relentry;
311 
312 	if (!is_publishable_relation(relation))
313 		return;
314 
315 	relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
316 
317 	/* First check the table filter */
318 	switch (change->action)
319 	{
320 		case REORDER_BUFFER_CHANGE_INSERT:
321 			if (!relentry->pubactions.pubinsert)
322 				return;
323 			break;
324 		case REORDER_BUFFER_CHANGE_UPDATE:
325 			if (!relentry->pubactions.pubupdate)
326 				return;
327 			break;
328 		case REORDER_BUFFER_CHANGE_DELETE:
329 			if (!relentry->pubactions.pubdelete)
330 				return;
331 			break;
332 		default:
333 			Assert(false);
334 	}
335 
336 	/* Avoid leaking memory by using and resetting our own context */
337 	old = MemoryContextSwitchTo(data->context);
338 
339 	maybe_send_schema(ctx, relation, relentry);
340 
341 	/* Send the data */
342 	switch (change->action)
343 	{
344 		case REORDER_BUFFER_CHANGE_INSERT:
345 			OutputPluginPrepareWrite(ctx, true);
346 			logicalrep_write_insert(ctx->out, relation,
347 									&change->data.tp.newtuple->tuple);
348 			OutputPluginWrite(ctx, true);
349 			break;
350 		case REORDER_BUFFER_CHANGE_UPDATE:
351 			{
352 				HeapTuple	oldtuple = change->data.tp.oldtuple ?
353 				&change->data.tp.oldtuple->tuple : NULL;
354 
355 				OutputPluginPrepareWrite(ctx, true);
356 				logicalrep_write_update(ctx->out, relation, oldtuple,
357 										&change->data.tp.newtuple->tuple);
358 				OutputPluginWrite(ctx, true);
359 				break;
360 			}
361 		case REORDER_BUFFER_CHANGE_DELETE:
362 			if (change->data.tp.oldtuple)
363 			{
364 				OutputPluginPrepareWrite(ctx, true);
365 				logicalrep_write_delete(ctx->out, relation,
366 										&change->data.tp.oldtuple->tuple);
367 				OutputPluginWrite(ctx, true);
368 			}
369 			else
370 				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
371 			break;
372 		default:
373 			Assert(false);
374 	}
375 
376 	/* Cleanup */
377 	MemoryContextSwitchTo(old);
378 	MemoryContextReset(data->context);
379 }
380 
381 static void
pgoutput_truncate(LogicalDecodingContext * ctx,ReorderBufferTXN * txn,int nrelations,Relation relations[],ReorderBufferChange * change)382 pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
383 				  int nrelations, Relation relations[], ReorderBufferChange *change)
384 {
385 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
386 	MemoryContext old;
387 	RelationSyncEntry *relentry;
388 	int			i;
389 	int			nrelids;
390 	Oid		   *relids;
391 
392 	old = MemoryContextSwitchTo(data->context);
393 
394 	relids = palloc0(nrelations * sizeof(Oid));
395 	nrelids = 0;
396 
397 	for (i = 0; i < nrelations; i++)
398 	{
399 		Relation	relation = relations[i];
400 		Oid			relid = RelationGetRelid(relation);
401 
402 		if (!is_publishable_relation(relation))
403 			continue;
404 
405 		relentry = get_rel_sync_entry(data, relid);
406 
407 		if (!relentry->pubactions.pubtruncate)
408 			continue;
409 
410 		relids[nrelids++] = relid;
411 		maybe_send_schema(ctx, relation, relentry);
412 	}
413 
414 	if (nrelids > 0)
415 	{
416 		OutputPluginPrepareWrite(ctx, true);
417 		logicalrep_write_truncate(ctx->out,
418 								  nrelids,
419 								  relids,
420 								  change->data.truncate.cascade,
421 								  change->data.truncate.restart_seqs);
422 		OutputPluginWrite(ctx, true);
423 	}
424 
425 	MemoryContextSwitchTo(old);
426 	MemoryContextReset(data->context);
427 }
428 
429 /*
430  * Currently we always forward.
431  */
432 static bool
pgoutput_origin_filter(LogicalDecodingContext * ctx,RepOriginId origin_id)433 pgoutput_origin_filter(LogicalDecodingContext *ctx,
434 					   RepOriginId origin_id)
435 {
436 	return false;
437 }
438 
439 /*
440  * Shutdown the output plugin.
441  *
442  * Note, we don't need to clean the data->context as it's child context
443  * of the ctx->context so it will be cleaned up by logical decoding machinery.
444  */
445 static void
pgoutput_shutdown(LogicalDecodingContext * ctx)446 pgoutput_shutdown(LogicalDecodingContext *ctx)
447 {
448 	if (RelationSyncCache)
449 	{
450 		hash_destroy(RelationSyncCache);
451 		RelationSyncCache = NULL;
452 	}
453 }
454 
455 /*
456  * Load publications from the list of publication names.
457  */
458 static List *
LoadPublications(List * pubnames)459 LoadPublications(List *pubnames)
460 {
461 	List	   *result = NIL;
462 	ListCell   *lc;
463 
464 	foreach(lc, pubnames)
465 	{
466 		char	   *pubname = (char *) lfirst(lc);
467 		Publication *pub = GetPublicationByName(pubname, false);
468 
469 		result = lappend(result, pub);
470 	}
471 
472 	return result;
473 }
474 
475 /*
476  * Publication cache invalidation callback.
477  */
478 static void
publication_invalidation_cb(Datum arg,int cacheid,uint32 hashvalue)479 publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
480 {
481 	publications_valid = false;
482 
483 	/*
484 	 * Also invalidate per-relation cache so that next time the filtering info
485 	 * is checked it will be updated with the new publication settings.
486 	 */
487 	rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
488 }
489 
490 /*
491  * Initialize the relation schema sync cache for a decoding session.
492  *
493  * The hash table is destroyed at the end of a decoding session. While
494  * relcache invalidations still exist and will still be invoked, they
495  * will just see the null hash table global and take no action.
496  */
497 static void
init_rel_sync_cache(MemoryContext cachectx)498 init_rel_sync_cache(MemoryContext cachectx)
499 {
500 	HASHCTL		ctl;
501 	MemoryContext old_ctxt;
502 
503 	if (RelationSyncCache != NULL)
504 		return;
505 
506 	/* Make a new hash table for the cache */
507 	MemSet(&ctl, 0, sizeof(ctl));
508 	ctl.keysize = sizeof(Oid);
509 	ctl.entrysize = sizeof(RelationSyncEntry);
510 	ctl.hcxt = cachectx;
511 
512 	old_ctxt = MemoryContextSwitchTo(cachectx);
513 	RelationSyncCache = hash_create("logical replication output relation cache",
514 									128, &ctl,
515 									HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
516 	(void) MemoryContextSwitchTo(old_ctxt);
517 
518 	Assert(RelationSyncCache != NULL);
519 
520 	CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
521 	CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
522 								  rel_sync_cache_publication_cb,
523 								  (Datum) 0);
524 }
525 
526 /*
527  * Find or create entry in the relation schema cache.
528  */
529 static RelationSyncEntry *
get_rel_sync_entry(PGOutputData * data,Oid relid)530 get_rel_sync_entry(PGOutputData *data, Oid relid)
531 {
532 	RelationSyncEntry *entry;
533 	bool		found;
534 	MemoryContext oldctx;
535 
536 	Assert(RelationSyncCache != NULL);
537 
538 	/* Find cached function info, creating if not found */
539 	oldctx = MemoryContextSwitchTo(CacheMemoryContext);
540 	entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
541 											  (void *) &relid,
542 											  HASH_ENTER, &found);
543 	MemoryContextSwitchTo(oldctx);
544 	Assert(entry != NULL);
545 
546 	/* Not found means schema wasn't sent */
547 	if (!found || !entry->replicate_valid)
548 	{
549 		List	   *pubids = GetRelationPublications(relid);
550 		ListCell   *lc;
551 
552 		/* Reload publications if needed before use. */
553 		if (!publications_valid)
554 		{
555 			oldctx = MemoryContextSwitchTo(CacheMemoryContext);
556 			if (data->publications)
557 				list_free_deep(data->publications);
558 
559 			data->publications = LoadPublications(data->publication_names);
560 			MemoryContextSwitchTo(oldctx);
561 			publications_valid = true;
562 		}
563 
564 		/*
565 		 * Build publication cache. We can't use one provided by relcache as
566 		 * relcache considers all publications given relation is in, but here
567 		 * we only need to consider ones that the subscriber requested.
568 		 */
569 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
570 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
571 
572 		foreach(lc, data->publications)
573 		{
574 			Publication *pub = lfirst(lc);
575 
576 			if (pub->alltables || list_member_oid(pubids, pub->oid))
577 			{
578 				entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
579 				entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
580 				entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
581 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
582 			}
583 
584 			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
585 				entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
586 				break;
587 		}
588 
589 		list_free(pubids);
590 
591 		entry->replicate_valid = true;
592 	}
593 
594 	if (!found)
595 		entry->schema_sent = false;
596 
597 	return entry;
598 }
599 
600 /*
601  * Relcache invalidation callback
602  */
603 static void
rel_sync_cache_relation_cb(Datum arg,Oid relid)604 rel_sync_cache_relation_cb(Datum arg, Oid relid)
605 {
606 	RelationSyncEntry *entry;
607 
608 	/*
609 	 * We can get here if the plugin was used in SQL interface as the
610 	 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
611 	 * is no way to unregister the relcache invalidation callback.
612 	 */
613 	if (RelationSyncCache == NULL)
614 		return;
615 
616 	/*
617 	 * Nobody keeps pointers to entries in this hash table around outside
618 	 * logical decoding callback calls - but invalidation events can come in
619 	 * *during* a callback if we access the relcache in the callback. Because
620 	 * of that we must mark the cache entry as invalid but not remove it from
621 	 * the hash while it could still be referenced, then prune it at a later
622 	 * safe point.
623 	 *
624 	 * Getting invalidations for relations that aren't in the table is
625 	 * entirely normal, since there's no way to unregister for an invalidation
626 	 * event. So we don't care if it's found or not.
627 	 */
628 	entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
629 											  HASH_FIND, NULL);
630 
631 	/*
632 	 * Reset schema sent status as the relation definition may have changed.
633 	 */
634 	if (entry != NULL)
635 		entry->schema_sent = false;
636 }
637 
638 /*
639  * Publication relation map syscache invalidation callback
640  */
641 static void
rel_sync_cache_publication_cb(Datum arg,int cacheid,uint32 hashvalue)642 rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
643 {
644 	HASH_SEQ_STATUS status;
645 	RelationSyncEntry *entry;
646 
647 	/*
648 	 * We can get here if the plugin was used in SQL interface as the
649 	 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
650 	 * is no way to unregister the relcache invalidation callback.
651 	 */
652 	if (RelationSyncCache == NULL)
653 		return;
654 
655 	/*
656 	 * There is no way to find which entry in our cache the hash belongs to so
657 	 * mark the whole cache as invalid.
658 	 */
659 	hash_seq_init(&status, RelationSyncCache);
660 	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
661 		entry->replicate_valid = false;
662 }
663