1 /*-
2 * Copyright (c) 1999, 2020 Oracle and/or its affiliates. All rights reserved.
3 *
4 * See the file LICENSE for license information.
5 *
6 * $Id$
7 */
8
9 #include "db_config.h"
10
11 #include "db_int.h"
12 #include "dbinc/db_page.h"
13 #include "dbinc/db_am.h"
14 #include "dbinc/lock.h"
15 #include "dbinc/mp.h"
16 #include "dbinc/qam.h"
17 #include "dbinc/txn.h"
18
19 static int __qam_rr __P((DB *, DB_THREAD_INFO *, DB_TXN *,
20 const char *, const char *, const char *, qam_name_op));
21 static int __qam_set_extentsize __P((DB *, u_int32_t));
22
23 /*
24 * __qam_db_create --
25 * Queue specific initialization of the DB structure.
26 *
27 * PUBLIC: int __qam_db_create __P((DB *));
28 */
29 int
__qam_db_create(dbp)30 __qam_db_create(dbp)
31 DB *dbp;
32 {
33 QUEUE *t;
34 int ret;
35
36 /* Allocate and initialize the private queue structure. */
37 if ((ret = __os_calloc(dbp->env, 1, sizeof(QUEUE), &t)) != 0)
38 return (ret);
39 dbp->q_internal = t;
40 dbp->get_q_extentsize = __qam_get_extentsize;
41 dbp->set_q_extentsize = __qam_set_extentsize;
42
43 t->re_pad = ' ';
44
45 return (0);
46 }
47
48 /*
49 * __qam_db_close --
50 * Queue specific discard of the DB structure.
51 *
52 * PUBLIC: int __qam_db_close __P((DB *, u_int32_t));
53 */
54 int
__qam_db_close(dbp,flags)55 __qam_db_close(dbp, flags)
56 DB *dbp;
57 u_int32_t flags;
58 {
59 DB_MPOOLFILE *mpf;
60 MPFARRAY *array;
61 QUEUE *t;
62 struct __qmpf *mpfp;
63 u_int32_t i;
64 int ret, t_ret;
65
66 ret = 0;
67 if ((t = dbp->q_internal) == NULL)
68 return (0);
69
70 array = &t->array1;
71 again:
72 mpfp = array->mpfarray;
73 if (mpfp != NULL && mpfp->mpf != NULL) {
74 for (i = array->low_extent;
75 i <= array->hi_extent; i++, mpfp++) {
76 mpf = mpfp->mpf;
77 mpfp->mpf = NULL;
78 if (mpf != NULL && (t_ret = __memp_fclose(mpf,
79 LF_ISSET(DB_AM_DISCARD) ? DB_MPOOL_DISCARD : 0))
80 != 0 && ret == 0)
81 ret = t_ret;
82 }
83 __os_free(dbp->env, array->mpfarray);
84 }
85 if (t->array2.n_extent != 0) {
86 array = &t->array2;
87 array->n_extent = 0;
88 goto again;
89 }
90
91 if (LF_ISSET(DB_AM_DISCARD) &&
92 (t_ret = __qam_nameop(dbp, NULL,
93 NULL, QAM_NAME_DISCARD)) != 0 && ret == 0)
94 ret = t_ret;
95
96 if (t->path != NULL)
97 __os_free(dbp->env, t->path);
98 __os_free(dbp->env, t);
99 dbp->q_internal = NULL;
100
101 return (ret);
102 }
103
104 /*
105 * __qam_get_extentsize --
106 * The DB->q_get_extentsize method.
107 *
108 * PUBLIC: int __qam_get_extentsize __P((DB *, u_int32_t *));
109 */
110 int
__qam_get_extentsize(dbp,q_extentsizep)111 __qam_get_extentsize(dbp, q_extentsizep)
112 DB *dbp;
113 u_int32_t *q_extentsizep;
114 {
115 *q_extentsizep = ((QUEUE*)dbp->q_internal)->page_ext;
116 return (0);
117 }
118
119 static int
__qam_set_extentsize(dbp,extentsize)120 __qam_set_extentsize(dbp, extentsize)
121 DB *dbp;
122 u_int32_t extentsize;
123 {
124 DB_ILLEGAL_AFTER_OPEN(dbp, "DB->set_extentsize");
125
126 if (extentsize < 1) {
127 __db_errx(dbp->env, DB_STR("1140",
128 "Extent size must be at least 1"));
129 return (EINVAL);
130 }
131
132 ((QUEUE*)dbp->q_internal)->page_ext = extentsize;
133
134 return (0);
135 }
136
137 /*
138 * __queue_pageinfo -
139 * Given a dbp, get first/last page information about a queue.
140 *
141 * PUBLIC: int __queue_pageinfo __P((DB *, db_pgno_t *, db_pgno_t *,
142 * PUBLIC: int *, int, u_int32_t));
143 */
144 int
__queue_pageinfo(dbp,firstp,lastp,emptyp,prpage,flags)145 __queue_pageinfo(dbp, firstp, lastp, emptyp, prpage, flags)
146 DB *dbp;
147 db_pgno_t *firstp, *lastp;
148 int *emptyp;
149 int prpage;
150 u_int32_t flags;
151 {
152 DB_MPOOLFILE *mpf;
153 DB_THREAD_INFO *ip;
154 QMETA *meta;
155 db_pgno_t first, i, last;
156 int empty, ret, t_ret;
157
158 mpf = dbp->mpf;
159 ENV_GET_THREAD_INFO(dbp->env, ip);
160
161 /* Find out the page number of the last page in the database. */
162 i = PGNO_BASE_MD;
163 if ((ret = __memp_fget(mpf, &i, ip, NULL, 0, &meta)) != 0)
164 return (ret);
165
166 first = QAM_RECNO_PAGE(dbp, meta->first_recno);
167 last = QAM_RECNO_PAGE(
168 dbp, meta->cur_recno == 1 ? 1 : meta->cur_recno - 1);
169
170 empty = meta->cur_recno == meta->first_recno;
171 if (firstp != NULL)
172 *firstp = first;
173 if (lastp != NULL)
174 *lastp = last;
175 if (emptyp != NULL)
176 *emptyp = empty;
177 #ifdef HAVE_STATISTICS
178 if (prpage)
179 ret = __db_prpage(dbp, (PAGE *)meta, flags);
180 #else
181 COMPQUIET(prpage, 0);
182 COMPQUIET(flags, 0);
183 #endif
184
185 if ((t_ret = __memp_fput(mpf,
186 ip, meta, dbp->priority)) != 0 && ret == 0)
187 ret = t_ret;
188
189 return (ret);
190 }
191
192 #ifdef HAVE_STATISTICS
193 /*
194 * __db_prqueue --
195 * Print out a queue
196 *
197 * PUBLIC: int __db_prqueue __P((DB *, u_int32_t));
198 */
199 int
__db_prqueue(dbp,flags)200 __db_prqueue(dbp, flags)
201 DB *dbp;
202 u_int32_t flags;
203 {
204 DBC *dbc;
205 DB_THREAD_INFO *ip;
206 PAGE *h;
207 db_pgno_t first, i, last, pg_ext, stop;
208 int empty, ret, t_ret;
209
210 if ((ret = __queue_pageinfo(dbp, &first, &last, &empty, 1, flags)) != 0)
211 return (ret);
212
213 if (empty || ret != 0)
214 return (ret);
215
216 ENV_GET_THREAD_INFO(dbp->env, ip);
217 if ((ret = __db_cursor(dbp, ip, NULL, &dbc, 0)) != 0)
218 return (ret);
219 i = first;
220 if (first > last)
221 stop = QAM_RECNO_PAGE(dbp, UINT32_MAX);
222 else
223 stop = last;
224
225 /* Dump each page. */
226 pg_ext = ((QUEUE *)dbp->q_internal)->page_ext;
227 begin:
228 for (; i <= stop; ++i) {
229 if ((ret = __qam_fget(dbc, &i, 0, &h)) != 0) {
230 if (pg_ext == 0) {
231 if (ret == DB_PAGE_NOTFOUND && first == last)
232 ret = 0;
233 goto err;
234 }
235 if (ret == ENOENT || ret == DB_PAGE_NOTFOUND) {
236 i += (pg_ext - ((i - 1) % pg_ext)) - 1;
237 ret = 0;
238 continue;
239 }
240 goto err;
241 }
242 (void)__db_prpage(dbp, h, flags);
243 if ((ret = __qam_fput(dbc, i, h, dbp->priority)) != 0)
244 goto err;
245 }
246
247 if (first > last) {
248 i = 1;
249 stop = last;
250 first = last;
251 goto begin;
252 }
253
254 err:
255 if ((t_ret = __dbc_close(dbc)) != 0 && ret == 0)
256 ret = t_ret;
257 return (ret);
258 }
259 #endif
260
261 /*
262 * __qam_remove --
263 * Remove method for a Queue.
264 *
265 * PUBLIC: int __qam_remove __P((DB *, DB_THREAD_INFO *, DB_TXN *,
266 * PUBLIC: const char *, const char *, u_int32_t));
267 */
268 int
__qam_remove(dbp,ip,txn,name,subdb,flags)269 __qam_remove(dbp, ip, txn, name, subdb, flags)
270 DB *dbp;
271 DB_THREAD_INFO *ip;
272 DB_TXN *txn;
273 const char *name, *subdb;
274 u_int32_t flags;
275 {
276 COMPQUIET(flags, 0);
277 return (__qam_rr(dbp, ip, txn, name, subdb, NULL, QAM_NAME_REMOVE));
278 }
279
280 /*
281 * __qam_rename --
282 * Rename method for a Queue.
283 *
284 * PUBLIC: int __qam_rename __P((DB *, DB_THREAD_INFO *, DB_TXN *,
285 * PUBLIC: const char *, const char *, const char *));
286 */
287 int
__qam_rename(dbp,ip,txn,name,subdb,newname)288 __qam_rename(dbp, ip, txn, name, subdb, newname)
289 DB *dbp;
290 DB_THREAD_INFO *ip;
291 DB_TXN *txn;
292 const char *name, *subdb, *newname;
293 {
294 return (__qam_rr(dbp, ip, txn, name, subdb, newname, QAM_NAME_RENAME));
295 }
296
297 /*
298 * __qam_rr --
299 * Remove/Rename method for a Queue.
300 */
301 static int
__qam_rr(dbp,ip,txn,name,subdb,newname,op)302 __qam_rr(dbp, ip, txn, name, subdb, newname, op)
303 DB *dbp;
304 DB_THREAD_INFO *ip;
305 DB_TXN *txn;
306 const char *name, *subdb, *newname;
307 qam_name_op op;
308 {
309 DB *tmpdbp;
310 ENV *env;
311 QUEUE *qp;
312 int ret, t_ret;
313
314 env = dbp->env;
315 ret = 0;
316
317 if (subdb != NULL && name != NULL) {
318 __db_errx(env, DB_STR("1141",
319 "Queue does not support multiple databases per file"));
320 return (EINVAL);
321 }
322
323 /*
324 * For rename/remove, there is no need to open the database
325 * via __db_open. It is good enough to just open its memory pool
326 * which is necessary for in-mem databases.
327 */
328 if (F2_ISSET(dbp, DB2_AM_MPOOL_OPENED))
329 tmpdbp = dbp;
330 else {
331 if ((ret = __db_create_internal(&tmpdbp, env, 0)) != 0)
332 return (ret);
333
334 /*
335 * We need to make sure we don't self-deadlock, so give
336 * this dbp the same locker as the incoming one.
337 */
338 tmpdbp->locker = dbp->locker;
339 if ((ret = __db_open(tmpdbp, ip, txn,
340 name, NULL, DB_QUEUE, DB_RDONLY, 0, PGNO_BASE_MD)) != 0)
341 goto err;
342 }
343
344 qp = (QUEUE *)tmpdbp->q_internal;
345 if (qp->page_ext != 0)
346 ret = __qam_nameop(tmpdbp, txn, newname, op);
347
348 if (!F2_ISSET(dbp, DB2_AM_MPOOL_OPENED)) {
349 err:
350 /* We need to remove the lock event we associated with this. */
351 if (txn != NULL)
352 __txn_remlock(env, txn, NULL, tmpdbp->locker);
353
354 /*
355 * Since we copied the locker ID from the dbp, we'd better not
356 * free it here.
357 */
358 tmpdbp->locker = NULL;
359
360 if ((t_ret = __db_close(tmpdbp,
361 txn, DB_NOSYNC)) != 0 && ret == 0)
362 ret = t_ret;
363 }
364 return (ret);
365 }
366
367 /*
368 * __qam_map_flags --
369 * Map queue-specific flags from public to the internal values.
370 *
371 * PUBLIC: void __qam_map_flags __P((DB *, u_int32_t *, u_int32_t *));
372 */
373 void
__qam_map_flags(dbp,inflagsp,outflagsp)374 __qam_map_flags(dbp, inflagsp, outflagsp)
375 DB *dbp;
376 u_int32_t *inflagsp, *outflagsp;
377 {
378 COMPQUIET(dbp, NULL);
379
380 if (FLD_ISSET(*inflagsp, DB_INORDER)) {
381 FLD_SET(*outflagsp, DB_AM_INORDER);
382 FLD_CLR(*inflagsp, DB_INORDER);
383 }
384 }
385
386 /*
387 * __qam_set_flags --
388 * Set queue-specific flags.
389 *
390 * PUBLIC: int __qam_set_flags __P((DB *, u_int32_t *flagsp));
391 */
392 int
__qam_set_flags(dbp,flagsp)393 __qam_set_flags(dbp, flagsp)
394 DB *dbp;
395 u_int32_t *flagsp;
396 {
397
398 __qam_map_flags(dbp, flagsp, &dbp->flags);
399 return (0);
400 }
401