1 /*-
2  * Copyright (c) 2008-2011 Varnish Software AS
3  * All rights reserved.
4  *
5  * Author: Poul-Henning Kamp <phk@phk.freebsd.dk>
6  *
7  * SPDX-License-Identifier: BSD-2-Clause
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in the
16  *    documentation and/or other materials provided with the distribution.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
19  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED.  IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
22  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28  * SUCH DAMAGE.
29  *
30  * Persistent storage method
31  *
32  * XXX: Before we start the client or maybe after it stops, we should give the
33  * XXX: stevedores a chance to examine their storage for consistency.
34  *
35  * XXX: Do we ever free the LRU-lists ?
36  */
37 
38 #include "config.h"
39 
40 #include "cache/cache_varnishd.h"
41 
42 #include <sys/mman.h>
43 
44 #include <stdio.h>
45 #include <stdlib.h>
46 
47 #include "cache/cache_obj.h"
48 #include "cache/cache_objhead.h"
49 #include "storage/storage.h"
50 #include "storage/storage_simple.h"
51 
52 #include "vcli_serve.h"
53 #include "vsha256.h"
54 #include "vtim.h"
55 
56 #include "storage/storage_persistent.h"
57 
58 static struct obj_methods smp_oc_realmethods;
59 
60 static struct VSC_lck *lck_smp;
61 
62 static void smp_init(void);
63 
64 /*--------------------------------------------------------------------*/
65 
66 /*
67  * silos is unlocked, it only changes during startup when we are
68  * single-threaded
69  */
70 static VTAILQ_HEAD(,smp_sc)	silos = VTAILQ_HEAD_INITIALIZER(silos);
71 
72 /*--------------------------------------------------------------------
73  * Add bans to silos
74  */
75 
76 static int
smp_appendban(const struct smp_sc * sc,struct smp_signspace * spc,uint32_t len,const uint8_t * ban)77 smp_appendban(const struct smp_sc *sc, struct smp_signspace *spc,
78     uint32_t len, const uint8_t *ban)
79 {
80 
81 	(void)sc;
82 	if (SIGNSPACE_FREE(spc) < len)
83 		return (-1);
84 
85 	memcpy(SIGNSPACE_FRONT(spc), ban, len);
86 	smp_append_signspace(spc, len);
87 
88 	return (0);
89 }
90 
91 /* Trust that cache_ban.c takes care of locking */
92 
93 static int
smp_baninfo(const struct stevedore * stv,enum baninfo event,const uint8_t * ban,unsigned len)94 smp_baninfo(const struct stevedore *stv, enum baninfo event,
95 	    const uint8_t *ban, unsigned len)
96 {
97 	struct smp_sc *sc;
98 	int r = 0;
99 
100 	CAST_OBJ_NOTNULL(sc, stv->priv, SMP_SC_MAGIC);
101 
102 	switch (event) {
103 	case BI_NEW:
104 		r |= smp_appendban(sc, &sc->ban1, len, ban);
105 		r |= smp_appendban(sc, &sc->ban2, len, ban);
106 		break;
107 	default:
108 		/* Ignored */
109 		break;
110 	}
111 
112 	return (r);
113 }
114 
115 static void
smp_banexport_spc(struct smp_signspace * spc,const uint8_t * bans,unsigned len)116 smp_banexport_spc(struct smp_signspace *spc, const uint8_t *bans, unsigned len)
117 {
118 	smp_reset_signspace(spc);
119 	assert(SIGNSPACE_FREE(spc) >= len);
120 	memcpy(SIGNSPACE_DATA(spc), bans, len);
121 	smp_append_signspace(spc, len);
122 	smp_sync_sign(&spc->ctx);
123 }
124 
125 static void
smp_banexport(const struct stevedore * stv,const uint8_t * bans,unsigned len)126 smp_banexport(const struct stevedore *stv, const uint8_t *bans, unsigned len)
127 {
128 	struct smp_sc *sc;
129 
130 	CAST_OBJ_NOTNULL(sc, stv->priv, SMP_SC_MAGIC);
131 	smp_banexport_spc(&sc->ban1, bans, len);
132 	smp_banexport_spc(&sc->ban2, bans, len);
133 }
134 
135 /*--------------------------------------------------------------------
136  * Attempt to open and read in a ban list
137  */
138 
139 static int
smp_open_bans(const struct smp_sc * sc,struct smp_signspace * spc)140 smp_open_bans(const struct smp_sc *sc, struct smp_signspace *spc)
141 {
142 	uint8_t *ptr, *pe;
143 	int i;
144 
145 	ASSERT_CLI();
146 	(void)sc;
147 	i = smp_chk_signspace(spc);
148 	if (i)
149 		return (i);
150 
151 	ptr = SIGNSPACE_DATA(spc);
152 	pe = SIGNSPACE_FRONT(spc);
153 	BAN_Reload(ptr, pe - ptr);
154 
155 	return (0);
156 }
157 
158 /*--------------------------------------------------------------------
159  * Attempt to open and read in a segment list
160  */
161 
162 static int
smp_open_segs(struct smp_sc * sc,struct smp_signspace * spc)163 smp_open_segs(struct smp_sc *sc, struct smp_signspace *spc)
164 {
165 	uint64_t length, l;
166 	struct smp_segptr *ss, *se;
167 	struct smp_seg *sg, *sg1, *sg2;
168 	int i, n = 0;
169 
170 	ASSERT_CLI();
171 	i = smp_chk_signspace(spc);
172 	if (i)
173 		return (i);
174 
175 	ss = SIGNSPACE_DATA(spc);
176 	length = SIGNSPACE_LEN(spc);
177 
178 	if (length == 0) {
179 		/* No segments */
180 		sc->free_offset = sc->ident->stuff[SMP_SPC_STUFF];
181 		return (0);
182 	}
183 	se = ss + length / sizeof *ss;
184 	se--;
185 	assert(ss <= se);
186 
187 	/*
188 	 * Locate the free reserve, there are only two basic cases,
189 	 * but once we start dropping segments, things gets more complicated.
190 	 */
191 
192 	sc->free_offset = se->offset + se->length;
193 	l = sc->mediasize - sc->free_offset;
194 	if (se->offset > ss->offset && l >= sc->free_reserve) {
195 		/*
196 		 * [__xxxxyyyyzzzz___]
197 		 * Plenty of space at tail, do nothing.
198 		 */
199 	} else if (ss->offset > se->offset) {
200 		/*
201 		 * [zzzz____xxxxyyyy_]
202 		 * (make) space between ends
203 		 * We might nuke the entire tail end without getting
204 		 * enough space, in which case we fall through to the
205 		 * last check.
206 		 */
207 		while (ss < se && ss->offset > se->offset) {
208 			l = ss->offset - (se->offset + se->length);
209 			if (l > sc->free_reserve)
210 				break;
211 			ss++;
212 			n++;
213 		}
214 	}
215 
216 	if (l < sc->free_reserve) {
217 		/*
218 		 * [__xxxxyyyyzzzz___]
219 		 * (make) space at front
220 		 */
221 		sc->free_offset = sc->ident->stuff[SMP_SPC_STUFF];
222 		while (ss < se) {
223 			l = ss->offset - sc->free_offset;
224 			if (l > sc->free_reserve)
225 				break;
226 			ss++;
227 			n++;
228 		}
229 	}
230 
231 	assert(l >= sc->free_reserve);
232 
233 
234 	sg1 = NULL;
235 	sg2 = NULL;
236 	for (; ss <= se; ss++) {
237 		ALLOC_OBJ(sg, SMP_SEG_MAGIC);
238 		AN(sg);
239 		VTAILQ_INIT(&sg->objcores);
240 		sg->p = *ss;
241 
242 		sg->flags |= SMP_SEG_MUSTLOAD;
243 
244 		/*
245 		 * HACK: prevent save_segs from nuking segment until we have
246 		 * HACK: loaded it.
247 		 */
248 		sg->nobj = 1;
249 		if (sg1 != NULL) {
250 			assert(sg1->p.offset != sg->p.offset);
251 			if (sg1->p.offset < sg->p.offset)
252 				assert(smp_segend(sg1) <= sg->p.offset);
253 			else
254 				assert(smp_segend(sg) <= sg1->p.offset);
255 		}
256 		if (sg2 != NULL) {
257 			assert(sg2->p.offset != sg->p.offset);
258 			if (sg2->p.offset < sg->p.offset)
259 				assert(smp_segend(sg2) <= sg->p.offset);
260 			else
261 				assert(smp_segend(sg) <= sg2->p.offset);
262 		}
263 
264 		/* XXX: check that they are inside silo */
265 		/* XXX: check that they don't overlap */
266 		/* XXX: check that they are serial */
267 		sg->sc = sc;
268 		VTAILQ_INSERT_TAIL(&sc->segments, sg, list);
269 		sg2 = sg;
270 		if (sg1 == NULL)
271 			sg1 = sg;
272 	}
273 	printf("Dropped %d segments to make free_reserve\n", n);
274 	return (0);
275 }
276 
277 /*--------------------------------------------------------------------
278  * Silo worker thread
279  */
280 
v_matchproto_(bgthread_t)281 static void * v_matchproto_(bgthread_t)
282 smp_thread(struct worker *wrk, void *priv)
283 {
284 	struct smp_sc	*sc;
285 	struct smp_seg *sg;
286 
287 	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
288 	CAST_OBJ_NOTNULL(sc, priv, SMP_SC_MAGIC);
289 	sc->thread = pthread_self();
290 
291 	/* First, load all the objects from all segments */
292 	VTAILQ_FOREACH(sg, &sc->segments, list)
293 		if (sg->flags & SMP_SEG_MUSTLOAD)
294 			smp_load_seg(wrk, sc, sg);
295 
296 	sc->flags |= SMP_SC_LOADED;
297 	BAN_Release();
298 	printf("Silo completely loaded\n");
299 
300 	/* Housekeeping loop */
301 	Lck_Lock(&sc->mtx);
302 	while (!(sc->flags & SMP_SC_STOP)) {
303 		sg = VTAILQ_FIRST(&sc->segments);
304 		if (sg != NULL && sg != sc->cur_seg && sg->nobj == 0)
305 			smp_save_segs(sc);
306 
307 		Lck_Unlock(&sc->mtx);
308 		VTIM_sleep(3.14159265359 - 2);
309 		Lck_Lock(&sc->mtx);
310 	}
311 
312 	smp_save_segs(sc);
313 
314 	Lck_Unlock(&sc->mtx);
315 	pthread_exit(0);
316 
317 	NEEDLESS(return (NULL));
318 }
319 
320 /*--------------------------------------------------------------------
321  * Open a silo in the worker process
322  */
323 
v_matchproto_(storage_open_f)324 static void v_matchproto_(storage_open_f)
325 smp_open(struct stevedore *st)
326 {
327 	struct smp_sc	*sc;
328 
329 	ASSERT_CLI();
330 
331 	if (VTAILQ_EMPTY(&silos))
332 		smp_init();
333 
334 	CAST_OBJ_NOTNULL(sc, st->priv, SMP_SC_MAGIC);
335 
336 	Lck_New(&sc->mtx, lck_smp);
337 	Lck_Lock(&sc->mtx);
338 
339 	sc->stevedore = st;
340 
341 	/* We trust the parent to give us a valid silo, for good measure: */
342 	AZ(smp_valid_silo(sc));
343 
344 	AZ(mprotect((void*)sc->base, 4096, PROT_READ));
345 
346 	sc->ident = SIGN_DATA(&sc->idn);
347 
348 	/* Check ban lists */
349 	if (smp_chk_signspace(&sc->ban1)) {
350 		/* Ban list 1 is broken, use ban2 */
351 		AZ(smp_chk_signspace(&sc->ban2));
352 		smp_copy_signspace(&sc->ban1, &sc->ban2);
353 		smp_sync_sign(&sc->ban1.ctx);
354 	} else {
355 		/* Ban1 is OK, copy to ban2 for consistency */
356 		smp_copy_signspace(&sc->ban2, &sc->ban1);
357 		smp_sync_sign(&sc->ban2.ctx);
358 	}
359 	AZ(smp_open_bans(sc, &sc->ban1));
360 
361 	/* We attempt seg1 first, and if that fails, try seg2 */
362 	if (smp_open_segs(sc, &sc->seg1))
363 		AZ(smp_open_segs(sc, &sc->seg2));
364 
365 	/*
366 	 * Grap a reference to the tail of the ban list, until the thread
367 	 * has loaded all objects, so we can be sure that all of our
368 	 * proto-bans survive until then.
369 	 */
370 	BAN_Hold();
371 
372 	/* XXX: save segments to ensure consistency between seg1 & seg2 ? */
373 
374 	/* XXX: abandon early segments to make sure we have free space ? */
375 
376 	(void)ObjSubscribeEvents(smp_oc_event, st,
377 	    OEV_BANCHG|OEV_TTLCHG|OEV_INSERT);
378 
379 	/* Open a new segment, so we are ready to write */
380 	smp_new_seg(sc);
381 
382 	/* Start the worker silo worker thread, it will load the objects */
383 	WRK_BgThread(&sc->bgthread, "persistence", smp_thread, sc);
384 
385 	VTAILQ_INSERT_TAIL(&silos, sc, list);
386 	Lck_Unlock(&sc->mtx);
387 }
388 
389 /*--------------------------------------------------------------------
390  * Close a silo
391  */
392 
v_matchproto_(storage_close_f)393 static void v_matchproto_(storage_close_f)
394 smp_close(const struct stevedore *st, int warn)
395 {
396 	struct smp_sc	*sc;
397 	void *status;
398 
399 	ASSERT_CLI();
400 
401 	CAST_OBJ_NOTNULL(sc, st->priv, SMP_SC_MAGIC);
402 	if (warn) {
403 		Lck_Lock(&sc->mtx);
404 		if (sc->cur_seg != NULL)
405 			smp_close_seg(sc, sc->cur_seg);
406 		AZ(sc->cur_seg);
407 		sc->flags |= SMP_SC_STOP;
408 		Lck_Unlock(&sc->mtx);
409 	} else {
410 		AZ(pthread_join(sc->bgthread, &status));
411 		AZ(status);
412 	}
413 }
414 
415 /*--------------------------------------------------------------------
416  * Allocate a bite.
417  *
418  * Allocate [min_size...max_size] space from the bottom of the segment,
419  * as is convenient.
420  *
421  * If 'so' + 'idx' is given, also allocate a smp_object from the top
422  * of the segment.
423  *
424  * Return the segment in 'ssg' if given.
425  */
426 
427 static struct storage *
smp_allocx(const struct stevedore * st,size_t min_size,size_t max_size,struct smp_object ** so,unsigned * idx,struct smp_seg ** ssg)428 smp_allocx(const struct stevedore *st, size_t min_size, size_t max_size,
429     struct smp_object **so, unsigned *idx, struct smp_seg **ssg)
430 {
431 	struct smp_sc *sc;
432 	struct storage *ss;
433 	struct smp_seg *sg;
434 	uint64_t left, extra;
435 
436 	CAST_OBJ_NOTNULL(sc, st->priv, SMP_SC_MAGIC);
437 	assert(min_size <= max_size);
438 
439 	max_size = IRNUP(sc, max_size);
440 	min_size = IRNUP(sc, min_size);
441 
442 	extra = IRNUP(sc, sizeof(*ss));
443 	if (so != NULL) {
444 		extra += sizeof(**so);
445 		AN(idx);
446 	}
447 
448 	Lck_Lock(&sc->mtx);
449 	sg = NULL;
450 	ss = NULL;
451 
452 	left = 0;
453 	if (sc->cur_seg != NULL)
454 		left = smp_spaceleft(sc, sc->cur_seg);
455 	if (left < extra + min_size) {
456 		if (sc->cur_seg != NULL)
457 			smp_close_seg(sc, sc->cur_seg);
458 		smp_new_seg(sc);
459 		if (sc->cur_seg != NULL)
460 			left = smp_spaceleft(sc, sc->cur_seg);
461 		else
462 			left = 0;
463 	}
464 
465 	if (left >= extra + min_size)  {
466 		AN(sc->cur_seg);
467 		if (left < extra + max_size)
468 			max_size = IRNDN(sc, left - extra);
469 
470 		sg = sc->cur_seg;
471 		ss = (void*)(sc->base + sc->next_bot);
472 		sc->next_bot += max_size + IRNUP(sc, sizeof(*ss));
473 		sg->nalloc++;
474 		if (so != NULL) {
475 			sc->next_top -= sizeof(**so);
476 			*so = (void*)(sc->base + sc->next_top);
477 			/* Render this smp_object mostly harmless */
478 			EXP_ZERO((*so));
479 			(*so)->ban = 0.;
480 			(*so)->ptr = 0;
481 			sg->objs = *so;
482 			*idx = ++sg->p.lobjlist;
483 		}
484 		(void)smp_spaceleft(sc, sg);	/* for the assert */
485 	}
486 	Lck_Unlock(&sc->mtx);
487 
488 	if (ss == NULL)
489 		return (ss);
490 	AN(sg);
491 	assert(max_size >= min_size);
492 
493 	/* Fill the storage structure */
494 	INIT_OBJ(ss, STORAGE_MAGIC);
495 	ss->ptr = PRNUP(sc, ss + 1);
496 	ss->space = max_size;
497 	ss->priv = sc;
498 	if (ssg != NULL)
499 		*ssg = sg;
500 	return (ss);
501 }
502 
503 /*--------------------------------------------------------------------
504  * Allocate an object
505  */
506 
v_matchproto_(storage_allocobj_f)507 static int v_matchproto_(storage_allocobj_f)
508 smp_allocobj(struct worker *wrk, const struct stevedore *stv,
509     struct objcore *oc, unsigned wsl)
510 {
511 	struct object *o;
512 	struct storage *st;
513 	struct smp_sc	*sc;
514 	struct smp_seg *sg;
515 	struct smp_object *so;
516 	unsigned objidx;
517 	unsigned ltot;
518 
519 	CHECK_OBJ_NOTNULL(wrk, WORKER_MAGIC);
520 	CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
521 	CAST_OBJ_NOTNULL(sc, stv->priv, SMP_SC_MAGIC);
522 
523 	/* Don't entertain already dead objects */
524 	if (oc->flags & OC_F_DYING)
525 		return (0);
526 	if (oc->t_origin <= 0.)
527 		return (0);
528 	if (oc->ttl + oc->grace + oc->keep <= 0.)
529 		return (0);
530 
531 	ltot = sizeof(struct object) + PRNDUP(wsl);
532 	ltot = IRNUP(sc, ltot);
533 
534 	st = NULL;
535 	sg = NULL;
536 	so = NULL;
537 	objidx = 0;
538 
539 	do {
540 		st = smp_allocx(stv, ltot, ltot, &so, &objidx, &sg);
541 		if (st != NULL && st->space < ltot) {
542 			stv->sml_free(st);		// NOP
543 			st = NULL;
544 		}
545 	} while (st == NULL && LRU_NukeOne(wrk, stv->lru));
546 	if (st == NULL)
547 		return (0);
548 
549 	AN(st);
550 	AN(sg);
551 	AN(so);
552 	assert(st->space >= ltot);
553 
554 	o = SML_MkObject(stv, oc, st->ptr);
555 	AN(oc->stobj->stevedore);
556 	assert(oc->stobj->stevedore == stv);
557 	CHECK_OBJ_NOTNULL(o, OBJECT_MAGIC);
558 	o->objstore = st;
559 	st->len = sizeof(*o);
560 
561 	Lck_Lock(&sc->mtx);
562 	sg->nfixed++;
563 	sg->nobj++;
564 
565 	/* We have to do this somewhere, might as well be here... */
566 	assert(sizeof so->hash == DIGEST_LEN);
567 	memcpy(so->hash, oc->objhead->digest, DIGEST_LEN);
568 	EXP_COPY(so, oc);
569 	so->ptr = (uint8_t*)o - sc->base;
570 	so->ban = BAN_Time(oc->ban);
571 
572 	smp_init_oc(oc, sg, objidx);
573 
574 	VTAILQ_INSERT_TAIL(&sg->objcores, oc, lru_list);
575 	Lck_Unlock(&sc->mtx);
576 	return (1);
577 }
578 
579 /*--------------------------------------------------------------------
580  * Allocate a bite
581  */
582 
v_matchproto_(sml_alloc_f)583 static struct storage * v_matchproto_(sml_alloc_f)
584 smp_alloc(const struct stevedore *st, size_t size)
585 {
586 
587 	return (smp_allocx(st,
588 	    size > 4096 ? 4096 : size, size, NULL, NULL, NULL));
589 }
590 
591 /*--------------------------------------------------------------------*/
592 
593 const struct stevedore smp_stevedore = {
594 	.magic		= STEVEDORE_MAGIC,
595 	.name		= "deprecated_persistent",
596 	.init		= smp_mgt_init,
597 	.open		= smp_open,
598 	.close		= smp_close,
599 	.allocobj	= smp_allocobj,
600 	.baninfo	= smp_baninfo,
601 	.banexport	= smp_banexport,
602 	.methods	= &smp_oc_realmethods,
603 
604 	.sml_alloc	= smp_alloc,
605 	.sml_free	= NULL,
606 	.sml_getobj	= smp_sml_getobj,
607 };
608 
609 /*--------------------------------------------------------------------
610  * Persistence is a bear to test unadulterated, so we cheat by adding
611  * a cli command we can use to make it do tricks for us.
612  */
613 
614 static void
debug_report_silo(struct cli * cli,const struct smp_sc * sc)615 debug_report_silo(struct cli *cli, const struct smp_sc *sc)
616 {
617 	struct smp_seg *sg;
618 
619 	VCLI_Out(cli, "Silo: %s (%s)\n",
620 	    sc->stevedore->ident, sc->filename);
621 	VTAILQ_FOREACH(sg, &sc->segments, list) {
622 		VCLI_Out(cli, "  Seg: [0x%jx ... +0x%jx]\n",
623 		   (uintmax_t)sg->p.offset, (uintmax_t)sg->p.length);
624 		if (sg == sc->cur_seg)
625 			VCLI_Out(cli,
626 			   "    Alloc: [0x%jx ... 0x%jx] = 0x%jx free\n",
627 			   (uintmax_t)(sc->next_bot),
628 			   (uintmax_t)(sc->next_top),
629 			   (uintmax_t)(sc->next_top - sc->next_bot));
630 		VCLI_Out(cli, "    %u nobj, %u alloc, %u lobjlist, %u fixed\n",
631 		    sg->nobj, sg->nalloc, sg->p.lobjlist, sg->nfixed);
632 	}
633 }
634 
v_matchproto_(cli_func_t)635 static void v_matchproto_(cli_func_t)
636 debug_persistent(struct cli *cli, const char * const * av, void *priv)
637 {
638 	struct smp_sc *sc;
639 
640 	(void)priv;
641 
642 	if (av[2] == NULL) {
643 		VTAILQ_FOREACH(sc, &silos, list)
644 			debug_report_silo(cli, sc);
645 		return;
646 	}
647 	VTAILQ_FOREACH(sc, &silos, list)
648 		if (!strcmp(av[2], sc->stevedore->ident))
649 			break;
650 	if (sc == NULL) {
651 		VCLI_Out(cli, "Silo <%s> not found\n", av[2]);
652 		VCLI_SetResult(cli, CLIS_PARAM);
653 		return;
654 	}
655 	if (av[3] == NULL) {
656 		debug_report_silo(cli, sc);
657 		return;
658 	}
659 	Lck_Lock(&sc->mtx);
660 	if (!strcmp(av[3], "sync")) {
661 		if (sc->cur_seg != NULL)
662 			smp_close_seg(sc, sc->cur_seg);
663 		smp_new_seg(sc);
664 	} else if (!strcmp(av[3], "dump")) {
665 		debug_report_silo(cli, sc);
666 	} else {
667 		VCLI_Out(cli, "Unknown operation\n");
668 		VCLI_SetResult(cli, CLIS_PARAM);
669 	}
670 	Lck_Unlock(&sc->mtx);
671 }
672 
673 static struct cli_proto debug_cmds[] = {
674 	{ CLICMD_DEBUG_PERSISTENT,		"d", debug_persistent },
675 	{ NULL }
676 };
677 
678 /*--------------------------------------------------------------------
679  */
680 
681 static void
smp_init(void)682 smp_init(void)
683 {
684 	lck_smp = Lck_CreateClass(NULL, "smp");
685 	CLI_AddFuncs(debug_cmds);
686 	smp_oc_realmethods = SML_methods;
687 	smp_oc_realmethods.objtouch = NULL;
688 	smp_oc_realmethods.objfree = smp_oc_objfree;
689 }
690 
691 /*--------------------------------------------------------------------
692  * Pause until all silos have loaded.
693  */
694 
695 void
SMP_Ready(void)696 SMP_Ready(void)
697 {
698 	struct smp_sc *sc;
699 
700 	ASSERT_CLI();
701 	do {
702 		VTAILQ_FOREACH(sc, &silos, list)
703 			if (!(sc->flags & SMP_SC_LOADED))
704 				break;
705 		if (sc != NULL)
706 			(void)sleep(1);
707 	} while (sc != NULL);
708 }
709