1 /*-
2  * Copyright (c) 1999, 2020 Oracle and/or its affiliates.  All rights reserved.
3  *
4  * See the file LICENSE for license information.
5  *
6  * $Id$
7  */
8 
9 #include "db_config.h"
10 
11 #include "db_int.h"
12 #include "dbinc/db_page.h"
13 #include "dbinc/db_am.h"
14 #include "dbinc/lock.h"
15 #include "dbinc/mp.h"
16 #include "dbinc/qam.h"
17 #include "dbinc/txn.h"
18 
19 static int __qam_rr __P((DB *, DB_THREAD_INFO *, DB_TXN *,
20 	       const char *, const char *, const char *, qam_name_op));
21 static int __qam_set_extentsize __P((DB *, u_int32_t));
22 
23 /*
24  * __qam_db_create --
25  *	Queue specific initialization of the DB structure.
26  *
27  * PUBLIC: int __qam_db_create __P((DB *));
28  */
29 int
__qam_db_create(dbp)30 __qam_db_create(dbp)
31 	DB *dbp;
32 {
33 	QUEUE *t;
34 	int ret;
35 
36 	/* Allocate and initialize the private queue structure. */
37 	if ((ret = __os_calloc(dbp->env, 1, sizeof(QUEUE), &t)) != 0)
38 		return (ret);
39 	dbp->q_internal = t;
40 	dbp->get_q_extentsize = __qam_get_extentsize;
41 	dbp->set_q_extentsize = __qam_set_extentsize;
42 
43 	t->re_pad = ' ';
44 
45 	return (0);
46 }
47 
48 /*
49  * __qam_db_close --
50  *	Queue specific discard of the DB structure.
51  *
52  * PUBLIC: int __qam_db_close __P((DB *, u_int32_t));
53  */
54 int
__qam_db_close(dbp,flags)55 __qam_db_close(dbp, flags)
56 	DB *dbp;
57 	u_int32_t flags;
58 {
59 	DB_MPOOLFILE *mpf;
60 	MPFARRAY *array;
61 	QUEUE *t;
62 	struct __qmpf *mpfp;
63 	u_int32_t i;
64 	int ret, t_ret;
65 
66 	ret = 0;
67 	if ((t = dbp->q_internal) == NULL)
68 		return (0);
69 
70 	array = &t->array1;
71 again:
72 	mpfp = array->mpfarray;
73 	if (mpfp != NULL && mpfp->mpf != NULL) {
74 		for (i = array->low_extent;
75 		    i <= array->hi_extent; i++, mpfp++) {
76 			mpf = mpfp->mpf;
77 			mpfp->mpf = NULL;
78 			if (mpf != NULL && (t_ret = __memp_fclose(mpf,
79 			    LF_ISSET(DB_AM_DISCARD) ? DB_MPOOL_DISCARD : 0))
80 			    != 0 && ret == 0)
81 				ret = t_ret;
82 		}
83 		__os_free(dbp->env, array->mpfarray);
84 	}
85 	if (t->array2.n_extent != 0) {
86 		array = &t->array2;
87 		array->n_extent = 0;
88 		goto again;
89 	}
90 
91 	if (LF_ISSET(DB_AM_DISCARD) &&
92 	     (t_ret = __qam_nameop(dbp, NULL,
93 	     NULL, QAM_NAME_DISCARD)) != 0 && ret == 0)
94 		ret = t_ret;
95 
96 	if (t->path != NULL)
97 		__os_free(dbp->env, t->path);
98 	__os_free(dbp->env, t);
99 	dbp->q_internal = NULL;
100 
101 	return (ret);
102 }
103 
104 /*
105  * __qam_get_extentsize --
106  *	The DB->q_get_extentsize method.
107  *
108  * PUBLIC: int __qam_get_extentsize __P((DB *, u_int32_t *));
109  */
110 int
__qam_get_extentsize(dbp,q_extentsizep)111 __qam_get_extentsize(dbp, q_extentsizep)
112 	DB *dbp;
113 	u_int32_t *q_extentsizep;
114 {
115 	*q_extentsizep = ((QUEUE*)dbp->q_internal)->page_ext;
116 	return (0);
117 }
118 
119 static int
__qam_set_extentsize(dbp,extentsize)120 __qam_set_extentsize(dbp, extentsize)
121 	DB *dbp;
122 	u_int32_t extentsize;
123 {
124 	DB_ILLEGAL_AFTER_OPEN(dbp, "DB->set_extentsize");
125 
126 	if (extentsize < 1) {
127 		__db_errx(dbp->env, DB_STR("1140",
128 		    "Extent size must be at least 1"));
129 		return (EINVAL);
130 	}
131 
132 	((QUEUE*)dbp->q_internal)->page_ext = extentsize;
133 
134 	return (0);
135 }
136 
137 /*
138  * __queue_pageinfo -
139  *	Given a dbp, get first/last page information about a queue.
140  *
141  * PUBLIC: int __queue_pageinfo __P((DB *, db_pgno_t *, db_pgno_t *,
142  * PUBLIC:       int *, int, u_int32_t));
143  */
144 int
__queue_pageinfo(dbp,firstp,lastp,emptyp,prpage,flags)145 __queue_pageinfo(dbp, firstp, lastp, emptyp, prpage, flags)
146 	DB *dbp;
147 	db_pgno_t *firstp, *lastp;
148 	int *emptyp;
149 	int prpage;
150 	u_int32_t flags;
151 {
152 	DB_MPOOLFILE *mpf;
153 	DB_THREAD_INFO *ip;
154 	QMETA *meta;
155 	db_pgno_t first, i, last;
156 	int empty, ret, t_ret;
157 
158 	mpf = dbp->mpf;
159 	ENV_GET_THREAD_INFO(dbp->env, ip);
160 
161 	/* Find out the page number of the last page in the database. */
162 	i = PGNO_BASE_MD;
163 	if ((ret = __memp_fget(mpf, &i, ip, NULL, 0, &meta)) != 0)
164 		return (ret);
165 
166 	first = QAM_RECNO_PAGE(dbp, meta->first_recno);
167 	last = QAM_RECNO_PAGE(
168 	    dbp, meta->cur_recno == 1 ? 1 : meta->cur_recno - 1);
169 
170 	empty = meta->cur_recno == meta->first_recno;
171 	if (firstp != NULL)
172 		*firstp = first;
173 	if (lastp != NULL)
174 		*lastp = last;
175 	if (emptyp != NULL)
176 		*emptyp = empty;
177 #ifdef HAVE_STATISTICS
178 	if (prpage)
179 		ret = __db_prpage(dbp, (PAGE *)meta, flags);
180 #else
181 	COMPQUIET(prpage, 0);
182 	COMPQUIET(flags, 0);
183 #endif
184 
185 	if ((t_ret = __memp_fput(mpf,
186 	    ip, meta, dbp->priority)) != 0 && ret == 0)
187 		ret = t_ret;
188 
189 	return (ret);
190 }
191 
192 #ifdef HAVE_STATISTICS
193 /*
194  * __db_prqueue --
195  *	Print out a queue
196  *
197  * PUBLIC: int __db_prqueue __P((DB *, u_int32_t));
198  */
199 int
__db_prqueue(dbp,flags)200 __db_prqueue(dbp, flags)
201 	DB *dbp;
202 	u_int32_t flags;
203 {
204 	DBC *dbc;
205 	DB_THREAD_INFO *ip;
206 	PAGE *h;
207 	db_pgno_t first, i, last, pg_ext, stop;
208 	int empty, ret, t_ret;
209 
210 	if ((ret = __queue_pageinfo(dbp, &first, &last, &empty, 1, flags)) != 0)
211 		return (ret);
212 
213 	if (empty || ret != 0)
214 		return (ret);
215 
216 	ENV_GET_THREAD_INFO(dbp->env, ip);
217 	if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0)
218 		return (ret);
219 	i = first;
220 	if (first > last)
221 		stop = QAM_RECNO_PAGE(dbp, UINT32_MAX);
222 	else
223 		stop = last;
224 
225 	/* Dump each page. */
226 	pg_ext = ((QUEUE *)dbp->q_internal)->page_ext;
227 begin:
228 	for (; i <= stop; ++i) {
229 		if ((ret = __qam_fget(dbc, &i, 0, &h)) != 0) {
230 			if (pg_ext == 0) {
231 				if (ret == DB_PAGE_NOTFOUND && first == last)
232 					ret = 0;
233 				goto err;
234 			}
235 			if (ret == ENOENT || ret == DB_PAGE_NOTFOUND) {
236 				i += (pg_ext - ((i - 1) % pg_ext)) - 1;
237 				ret = 0;
238 				continue;
239 			}
240 			goto err;
241 		}
242 		(void)__db_prpage(dbp, h, flags);
243 		if ((ret = __qam_fput(dbc, i, h, dbp->priority)) != 0)
244 			goto err;
245 	}
246 
247 	if (first > last) {
248 		i = 1;
249 		stop = last;
250 		first = last;
251 		goto begin;
252 	}
253 
254 err:
255 	if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
256 		ret = t_ret;
257 	return (ret);
258 }
259 #endif
260 
261 /*
262  * __qam_remove --
263  *	Remove method for a Queue.
264  *
265  * PUBLIC: int __qam_remove __P((DB *, DB_THREAD_INFO *, DB_TXN *,
266  * PUBLIC:    const char *, const char *, u_int32_t));
267  */
268 int
__qam_remove(dbp,ip,txn,name,subdb,flags)269 __qam_remove(dbp, ip, txn, name, subdb, flags)
270 	DB *dbp;
271 	DB_THREAD_INFO *ip;
272 	DB_TXN *txn;
273 	const char *name, *subdb;
274 	u_int32_t flags;
275 {
276 	COMPQUIET(flags, 0);
277 	return (__qam_rr(dbp, ip, txn, name, subdb, NULL, QAM_NAME_REMOVE));
278 }
279 
280 /*
281  * __qam_rename --
282  *	Rename method for a Queue.
283  *
284  * PUBLIC: int __qam_rename __P((DB *, DB_THREAD_INFO *, DB_TXN *,
285  * PUBLIC:         const char *, const char *, const char *));
286  */
287 int
__qam_rename(dbp,ip,txn,name,subdb,newname)288 __qam_rename(dbp, ip, txn, name, subdb, newname)
289 	DB *dbp;
290 	DB_THREAD_INFO *ip;
291 	DB_TXN *txn;
292 	const char *name, *subdb, *newname;
293 {
294 	return (__qam_rr(dbp, ip, txn, name, subdb, newname, QAM_NAME_RENAME));
295 }
296 
297 /*
298  * __qam_rr --
299  *	Remove/Rename method for a Queue.
300  */
301 static int
__qam_rr(dbp,ip,txn,name,subdb,newname,op)302 __qam_rr(dbp, ip, txn, name, subdb, newname, op)
303 	DB *dbp;
304 	DB_THREAD_INFO *ip;
305 	DB_TXN *txn;
306 	const char *name, *subdb, *newname;
307 	qam_name_op op;
308 {
309 	DB *tmpdbp;
310 	ENV *env;
311 	QUEUE *qp;
312 	int ret, t_ret;
313 
314 	env = dbp->env;
315 	ret = 0;
316 
317 	if (subdb != NULL && name != NULL) {
318 		__db_errx(env, DB_STR("1141",
319 		    "Queue does not support multiple databases per file"));
320 		return (EINVAL);
321 	}
322 
323 	/*
324 	 * For rename/remove, there is no need to open the database
325 	 * via __db_open.  It is good enough to just open its memory pool
326 	 * which is necessary for in-mem databases.
327 	 */
328 	if (F2_ISSET(dbp, DB2_AM_MPOOL_OPENED))
329 		tmpdbp = dbp;
330 	else {
331 		if ((ret = __db_create_internal(&tmpdbp, env, 0)) != 0)
332 			return (ret);
333 
334 		/*
335 		 * We need to make sure we don't self-deadlock, so give
336 		 * this dbp the same locker as the incoming one.
337 		 */
338 		tmpdbp->locker = dbp->locker;
339 		if ((ret = __db_open(tmpdbp, ip, txn,
340 		    name, NULL, DB_QUEUE, DB_RDONLY, 0, PGNO_BASE_MD)) != 0)
341 			goto err;
342 	}
343 
344 	qp = (QUEUE *)tmpdbp->q_internal;
345 	if (qp->page_ext != 0)
346 		ret = __qam_nameop(tmpdbp, txn, newname, op);
347 
348 	if (!F2_ISSET(dbp, DB2_AM_MPOOL_OPENED)) {
349 err:
350 		/* We need to remove the lock event we associated with this. */
351 		if (txn != NULL)
352 			__txn_remlock(env, txn, NULL, tmpdbp->locker);
353 
354 		/*
355 		 * Since we copied the locker ID from the dbp, we'd better not
356 		 * free it here.
357 		 */
358 		tmpdbp->locker = NULL;
359 
360 		if ((t_ret = __db_close(tmpdbp,
361 		    txn, DB_NOSYNC)) != 0 && ret == 0)
362 			ret = t_ret;
363 	}
364 	return (ret);
365 }
366 
367 /*
368  * __qam_map_flags --
369  *	Map queue-specific flags from public to the internal values.
370  *
371  * PUBLIC: void __qam_map_flags __P((DB *, u_int32_t *, u_int32_t *));
372  */
373 void
__qam_map_flags(dbp,inflagsp,outflagsp)374 __qam_map_flags(dbp, inflagsp, outflagsp)
375 	DB *dbp;
376 	u_int32_t *inflagsp, *outflagsp;
377 {
378 	COMPQUIET(dbp, NULL);
379 
380 	if (FLD_ISSET(*inflagsp, DB_INORDER)) {
381 		FLD_SET(*outflagsp, DB_AM_INORDER);
382 		FLD_CLR(*inflagsp, DB_INORDER);
383 	}
384 }
385 
386 /*
387  * __qam_set_flags --
388  *	Set queue-specific flags.
389  *
390  * PUBLIC: int __qam_set_flags __P((DB *, u_int32_t *flagsp));
391  */
392 int
__qam_set_flags(dbp,flagsp)393 __qam_set_flags(dbp, flagsp)
394 	DB *dbp;
395 	u_int32_t *flagsp;
396 {
397 
398 	__qam_map_flags(dbp, flagsp, &dbp->flags);
399 	return (0);
400 }
401