1 /*-------------------------------------------------------------------------
2 *
3 * ginutil.c
4 * Utility routines for the Postgres inverted index access method.
5 *
6 *
7 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/access/gin/ginutil.c
12 *-------------------------------------------------------------------------
13 */
14
15 #include "postgres.h"
16
17 #include "access/gin_private.h"
18 #include "access/ginxlog.h"
19 #include "access/reloptions.h"
20 #include "access/xloginsert.h"
21 #include "catalog/pg_collation.h"
22 #include "catalog/pg_type.h"
23 #include "miscadmin.h"
24 #include "storage/indexfsm.h"
25 #include "storage/lmgr.h"
26 #include "utils/builtins.h"
27 #include "utils/index_selfuncs.h"
28 #include "utils/typcache.h"
29
30
31 /*
32 * GIN handler function: return IndexAmRoutine with access method parameters
33 * and callbacks.
34 */
35 Datum
ginhandler(PG_FUNCTION_ARGS)36 ginhandler(PG_FUNCTION_ARGS)
37 {
38 IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
39
40 amroutine->amstrategies = 0;
41 amroutine->amsupport = GINNProcs;
42 amroutine->amcanorder = false;
43 amroutine->amcanorderbyop = false;
44 amroutine->amcanbackward = false;
45 amroutine->amcanunique = false;
46 amroutine->amcanmulticol = true;
47 amroutine->amoptionalkey = true;
48 amroutine->amsearcharray = false;
49 amroutine->amsearchnulls = false;
50 amroutine->amstorage = true;
51 amroutine->amclusterable = false;
52 amroutine->ampredlocks = false;
53 amroutine->amcanparallel = false;
54 amroutine->amkeytype = InvalidOid;
55
56 amroutine->ambuild = ginbuild;
57 amroutine->ambuildempty = ginbuildempty;
58 amroutine->aminsert = gininsert;
59 amroutine->ambulkdelete = ginbulkdelete;
60 amroutine->amvacuumcleanup = ginvacuumcleanup;
61 amroutine->amcanreturn = NULL;
62 amroutine->amcostestimate = gincostestimate;
63 amroutine->amoptions = ginoptions;
64 amroutine->amproperty = NULL;
65 amroutine->amvalidate = ginvalidate;
66 amroutine->ambeginscan = ginbeginscan;
67 amroutine->amrescan = ginrescan;
68 amroutine->amgettuple = NULL;
69 amroutine->amgetbitmap = gingetbitmap;
70 amroutine->amendscan = ginendscan;
71 amroutine->ammarkpos = NULL;
72 amroutine->amrestrpos = NULL;
73 amroutine->amestimateparallelscan = NULL;
74 amroutine->aminitparallelscan = NULL;
75 amroutine->amparallelrescan = NULL;
76
77 PG_RETURN_POINTER(amroutine);
78 }
79
80 /*
81 * initGinState: fill in an empty GinState struct to describe the index
82 *
83 * Note: assorted subsidiary data is allocated in the CurrentMemoryContext.
84 */
85 void
initGinState(GinState * state,Relation index)86 initGinState(GinState *state, Relation index)
87 {
88 TupleDesc origTupdesc = RelationGetDescr(index);
89 int i;
90
91 MemSet(state, 0, sizeof(GinState));
92
93 state->index = index;
94 state->oneCol = (origTupdesc->natts == 1) ? true : false;
95 state->origTupdesc = origTupdesc;
96
97 for (i = 0; i < origTupdesc->natts; i++)
98 {
99 if (state->oneCol)
100 state->tupdesc[i] = state->origTupdesc;
101 else
102 {
103 state->tupdesc[i] = CreateTemplateTupleDesc(2, false);
104
105 TupleDescInitEntry(state->tupdesc[i], (AttrNumber) 1, NULL,
106 INT2OID, -1, 0);
107 TupleDescInitEntry(state->tupdesc[i], (AttrNumber) 2, NULL,
108 origTupdesc->attrs[i]->atttypid,
109 origTupdesc->attrs[i]->atttypmod,
110 origTupdesc->attrs[i]->attndims);
111 TupleDescInitEntryCollation(state->tupdesc[i], (AttrNumber) 2,
112 origTupdesc->attrs[i]->attcollation);
113 }
114
115 /*
116 * If the compare proc isn't specified in the opclass definition, look
117 * up the index key type's default btree comparator.
118 */
119 if (index_getprocid(index, i + 1, GIN_COMPARE_PROC) != InvalidOid)
120 {
121 fmgr_info_copy(&(state->compareFn[i]),
122 index_getprocinfo(index, i + 1, GIN_COMPARE_PROC),
123 CurrentMemoryContext);
124 }
125 else
126 {
127 TypeCacheEntry *typentry;
128
129 typentry = lookup_type_cache(origTupdesc->attrs[i]->atttypid,
130 TYPECACHE_CMP_PROC_FINFO);
131 if (!OidIsValid(typentry->cmp_proc_finfo.fn_oid))
132 ereport(ERROR,
133 (errcode(ERRCODE_UNDEFINED_FUNCTION),
134 errmsg("could not identify a comparison function for type %s",
135 format_type_be(origTupdesc->attrs[i]->atttypid))));
136 fmgr_info_copy(&(state->compareFn[i]),
137 &(typentry->cmp_proc_finfo),
138 CurrentMemoryContext);
139 }
140
141 /* Opclass must always provide extract procs */
142 fmgr_info_copy(&(state->extractValueFn[i]),
143 index_getprocinfo(index, i + 1, GIN_EXTRACTVALUE_PROC),
144 CurrentMemoryContext);
145 fmgr_info_copy(&(state->extractQueryFn[i]),
146 index_getprocinfo(index, i + 1, GIN_EXTRACTQUERY_PROC),
147 CurrentMemoryContext);
148
149 /*
150 * Check opclass capability to do tri-state or binary logic consistent
151 * check.
152 */
153 if (index_getprocid(index, i + 1, GIN_TRICONSISTENT_PROC) != InvalidOid)
154 {
155 fmgr_info_copy(&(state->triConsistentFn[i]),
156 index_getprocinfo(index, i + 1, GIN_TRICONSISTENT_PROC),
157 CurrentMemoryContext);
158 }
159
160 if (index_getprocid(index, i + 1, GIN_CONSISTENT_PROC) != InvalidOid)
161 {
162 fmgr_info_copy(&(state->consistentFn[i]),
163 index_getprocinfo(index, i + 1, GIN_CONSISTENT_PROC),
164 CurrentMemoryContext);
165 }
166
167 if (state->consistentFn[i].fn_oid == InvalidOid &&
168 state->triConsistentFn[i].fn_oid == InvalidOid)
169 {
170 elog(ERROR, "missing GIN support function (%d or %d) for attribute %d of index \"%s\"",
171 GIN_CONSISTENT_PROC, GIN_TRICONSISTENT_PROC,
172 i + 1, RelationGetRelationName(index));
173 }
174
175 /*
176 * Check opclass capability to do partial match.
177 */
178 if (index_getprocid(index, i + 1, GIN_COMPARE_PARTIAL_PROC) != InvalidOid)
179 {
180 fmgr_info_copy(&(state->comparePartialFn[i]),
181 index_getprocinfo(index, i + 1, GIN_COMPARE_PARTIAL_PROC),
182 CurrentMemoryContext);
183 state->canPartialMatch[i] = true;
184 }
185 else
186 {
187 state->canPartialMatch[i] = false;
188 }
189
190 /*
191 * If the index column has a specified collation, we should honor that
192 * while doing comparisons. However, we may have a collatable storage
193 * type for a noncollatable indexed data type (for instance, hstore
194 * uses text index entries). If there's no index collation then
195 * specify default collation in case the support functions need
196 * collation. This is harmless if the support functions don't care
197 * about collation, so we just do it unconditionally. (We could
198 * alternatively call get_typcollation, but that seems like expensive
199 * overkill --- there aren't going to be any cases where a GIN storage
200 * type has a nondefault collation.)
201 */
202 if (OidIsValid(index->rd_indcollation[i]))
203 state->supportCollation[i] = index->rd_indcollation[i];
204 else
205 state->supportCollation[i] = DEFAULT_COLLATION_OID;
206 }
207 }
208
209 /*
210 * Extract attribute (column) number of stored entry from GIN tuple
211 */
212 OffsetNumber
gintuple_get_attrnum(GinState * ginstate,IndexTuple tuple)213 gintuple_get_attrnum(GinState *ginstate, IndexTuple tuple)
214 {
215 OffsetNumber colN;
216
217 if (ginstate->oneCol)
218 {
219 /* column number is not stored explicitly */
220 colN = FirstOffsetNumber;
221 }
222 else
223 {
224 Datum res;
225 bool isnull;
226
227 /*
228 * First attribute is always int16, so we can safely use any tuple
229 * descriptor to obtain first attribute of tuple
230 */
231 res = index_getattr(tuple, FirstOffsetNumber, ginstate->tupdesc[0],
232 &isnull);
233 Assert(!isnull);
234
235 colN = DatumGetUInt16(res);
236 Assert(colN >= FirstOffsetNumber && colN <= ginstate->origTupdesc->natts);
237 }
238
239 return colN;
240 }
241
242 /*
243 * Extract stored datum (and possible null category) from GIN tuple
244 */
245 Datum
gintuple_get_key(GinState * ginstate,IndexTuple tuple,GinNullCategory * category)246 gintuple_get_key(GinState *ginstate, IndexTuple tuple,
247 GinNullCategory *category)
248 {
249 Datum res;
250 bool isnull;
251
252 if (ginstate->oneCol)
253 {
254 /*
255 * Single column index doesn't store attribute numbers in tuples
256 */
257 res = index_getattr(tuple, FirstOffsetNumber, ginstate->origTupdesc,
258 &isnull);
259 }
260 else
261 {
262 /*
263 * Since the datum type depends on which index column it's from, we
264 * must be careful to use the right tuple descriptor here.
265 */
266 OffsetNumber colN = gintuple_get_attrnum(ginstate, tuple);
267
268 res = index_getattr(tuple, OffsetNumberNext(FirstOffsetNumber),
269 ginstate->tupdesc[colN - 1],
270 &isnull);
271 }
272
273 if (isnull)
274 *category = GinGetNullCategory(tuple, ginstate);
275 else
276 *category = GIN_CAT_NORM_KEY;
277
278 return res;
279 }
280
281 /*
282 * Allocate a new page (either by recycling, or by extending the index file)
283 * The returned buffer is already pinned and exclusive-locked
284 * Caller is responsible for initializing the page by calling GinInitBuffer
285 */
286 Buffer
GinNewBuffer(Relation index)287 GinNewBuffer(Relation index)
288 {
289 Buffer buffer;
290 bool needLock;
291
292 /* First, try to get a page from FSM */
293 for (;;)
294 {
295 BlockNumber blkno = GetFreeIndexPage(index);
296
297 if (blkno == InvalidBlockNumber)
298 break;
299
300 buffer = ReadBuffer(index, blkno);
301
302 /*
303 * We have to guard against the possibility that someone else already
304 * recycled this page; the buffer may be locked if so.
305 */
306 if (ConditionalLockBuffer(buffer))
307 {
308 if (GinPageIsRecyclable(BufferGetPage(buffer)))
309 return buffer; /* OK to use */
310
311 LockBuffer(buffer, GIN_UNLOCK);
312 }
313
314 /* Can't use it, so release buffer and try again */
315 ReleaseBuffer(buffer);
316 }
317
318 /* Must extend the file */
319 needLock = !RELATION_IS_LOCAL(index);
320 if (needLock)
321 LockRelationForExtension(index, ExclusiveLock);
322
323 buffer = ReadBuffer(index, P_NEW);
324 LockBuffer(buffer, GIN_EXCLUSIVE);
325
326 if (needLock)
327 UnlockRelationForExtension(index, ExclusiveLock);
328
329 return buffer;
330 }
331
332 void
GinInitPage(Page page,uint32 f,Size pageSize)333 GinInitPage(Page page, uint32 f, Size pageSize)
334 {
335 GinPageOpaque opaque;
336
337 PageInit(page, pageSize, sizeof(GinPageOpaqueData));
338
339 opaque = GinPageGetOpaque(page);
340 memset(opaque, 0, sizeof(GinPageOpaqueData));
341 opaque->flags = f;
342 opaque->rightlink = InvalidBlockNumber;
343 }
344
345 void
GinInitBuffer(Buffer b,uint32 f)346 GinInitBuffer(Buffer b, uint32 f)
347 {
348 GinInitPage(BufferGetPage(b), f, BufferGetPageSize(b));
349 }
350
351 void
GinInitMetabuffer(Buffer b)352 GinInitMetabuffer(Buffer b)
353 {
354 GinMetaPageData *metadata;
355 Page page = BufferGetPage(b);
356
357 GinInitPage(page, GIN_META, BufferGetPageSize(b));
358
359 metadata = GinPageGetMeta(page);
360
361 metadata->head = metadata->tail = InvalidBlockNumber;
362 metadata->tailFreeSize = 0;
363 metadata->nPendingPages = 0;
364 metadata->nPendingHeapTuples = 0;
365 metadata->nTotalPages = 0;
366 metadata->nEntryPages = 0;
367 metadata->nDataPages = 0;
368 metadata->nEntries = 0;
369 metadata->ginVersion = GIN_CURRENT_VERSION;
370 }
371
372 /*
373 * Compare two keys of the same index column
374 */
375 int
ginCompareEntries(GinState * ginstate,OffsetNumber attnum,Datum a,GinNullCategory categorya,Datum b,GinNullCategory categoryb)376 ginCompareEntries(GinState *ginstate, OffsetNumber attnum,
377 Datum a, GinNullCategory categorya,
378 Datum b, GinNullCategory categoryb)
379 {
380 /* if not of same null category, sort by that first */
381 if (categorya != categoryb)
382 return (categorya < categoryb) ? -1 : 1;
383
384 /* all null items in same category are equal */
385 if (categorya != GIN_CAT_NORM_KEY)
386 return 0;
387
388 /* both not null, so safe to call the compareFn */
389 return DatumGetInt32(FunctionCall2Coll(&ginstate->compareFn[attnum - 1],
390 ginstate->supportCollation[attnum - 1],
391 a, b));
392 }
393
394 /*
395 * Compare two keys of possibly different index columns
396 */
397 int
ginCompareAttEntries(GinState * ginstate,OffsetNumber attnuma,Datum a,GinNullCategory categorya,OffsetNumber attnumb,Datum b,GinNullCategory categoryb)398 ginCompareAttEntries(GinState *ginstate,
399 OffsetNumber attnuma, Datum a, GinNullCategory categorya,
400 OffsetNumber attnumb, Datum b, GinNullCategory categoryb)
401 {
402 /* attribute number is the first sort key */
403 if (attnuma != attnumb)
404 return (attnuma < attnumb) ? -1 : 1;
405
406 return ginCompareEntries(ginstate, attnuma, a, categorya, b, categoryb);
407 }
408
409
410 /*
411 * Support for sorting key datums in ginExtractEntries
412 *
413 * Note: we only have to worry about null and not-null keys here;
414 * ginExtractEntries never generates more than one placeholder null,
415 * so it doesn't have to sort those.
416 */
417 typedef struct
418 {
419 Datum datum;
420 bool isnull;
421 } keyEntryData;
422
423 typedef struct
424 {
425 FmgrInfo *cmpDatumFunc;
426 Oid collation;
427 bool haveDups;
428 } cmpEntriesArg;
429
430 static int
cmpEntries(const void * a,const void * b,void * arg)431 cmpEntries(const void *a, const void *b, void *arg)
432 {
433 const keyEntryData *aa = (const keyEntryData *) a;
434 const keyEntryData *bb = (const keyEntryData *) b;
435 cmpEntriesArg *data = (cmpEntriesArg *) arg;
436 int res;
437
438 if (aa->isnull)
439 {
440 if (bb->isnull)
441 res = 0; /* NULL "=" NULL */
442 else
443 res = 1; /* NULL ">" not-NULL */
444 }
445 else if (bb->isnull)
446 res = -1; /* not-NULL "<" NULL */
447 else
448 res = DatumGetInt32(FunctionCall2Coll(data->cmpDatumFunc,
449 data->collation,
450 aa->datum, bb->datum));
451
452 /*
453 * Detect if we have any duplicates. If there are equal keys, qsort must
454 * compare them at some point, else it wouldn't know whether one should go
455 * before or after the other.
456 */
457 if (res == 0)
458 data->haveDups = true;
459
460 return res;
461 }
462
463
464 /*
465 * Extract the index key values from an indexable item
466 *
467 * The resulting key values are sorted, and any duplicates are removed.
468 * This avoids generating redundant index entries.
469 */
470 Datum *
ginExtractEntries(GinState * ginstate,OffsetNumber attnum,Datum value,bool isNull,int32 * nentries,GinNullCategory ** categories)471 ginExtractEntries(GinState *ginstate, OffsetNumber attnum,
472 Datum value, bool isNull,
473 int32 *nentries, GinNullCategory **categories)
474 {
475 Datum *entries;
476 bool *nullFlags;
477 int32 i;
478
479 /*
480 * We don't call the extractValueFn on a null item. Instead generate a
481 * placeholder.
482 */
483 if (isNull)
484 {
485 *nentries = 1;
486 entries = (Datum *) palloc(sizeof(Datum));
487 entries[0] = (Datum) 0;
488 *categories = (GinNullCategory *) palloc(sizeof(GinNullCategory));
489 (*categories)[0] = GIN_CAT_NULL_ITEM;
490 return entries;
491 }
492
493 /* OK, call the opclass's extractValueFn */
494 nullFlags = NULL; /* in case extractValue doesn't set it */
495 entries = (Datum *)
496 DatumGetPointer(FunctionCall3Coll(&ginstate->extractValueFn[attnum - 1],
497 ginstate->supportCollation[attnum - 1],
498 value,
499 PointerGetDatum(nentries),
500 PointerGetDatum(&nullFlags)));
501
502 /*
503 * Generate a placeholder if the item contained no keys.
504 */
505 if (entries == NULL || *nentries <= 0)
506 {
507 *nentries = 1;
508 entries = (Datum *) palloc(sizeof(Datum));
509 entries[0] = (Datum) 0;
510 *categories = (GinNullCategory *) palloc(sizeof(GinNullCategory));
511 (*categories)[0] = GIN_CAT_EMPTY_ITEM;
512 return entries;
513 }
514
515 /*
516 * If the extractValueFn didn't create a nullFlags array, create one,
517 * assuming that everything's non-null. Otherwise, run through the array
518 * and make sure each value is exactly 0 or 1; this ensures binary
519 * compatibility with the GinNullCategory representation.
520 */
521 if (nullFlags == NULL)
522 nullFlags = (bool *) palloc0(*nentries * sizeof(bool));
523 else
524 {
525 for (i = 0; i < *nentries; i++)
526 nullFlags[i] = (nullFlags[i] ? true : false);
527 }
528 /* now we can use the nullFlags as category codes */
529 *categories = (GinNullCategory *) nullFlags;
530
531 /*
532 * If there's more than one key, sort and unique-ify.
533 *
534 * XXX Using qsort here is notationally painful, and the overhead is
535 * pretty bad too. For small numbers of keys it'd likely be better to use
536 * a simple insertion sort.
537 */
538 if (*nentries > 1)
539 {
540 keyEntryData *keydata;
541 cmpEntriesArg arg;
542
543 keydata = (keyEntryData *) palloc(*nentries * sizeof(keyEntryData));
544 for (i = 0; i < *nentries; i++)
545 {
546 keydata[i].datum = entries[i];
547 keydata[i].isnull = nullFlags[i];
548 }
549
550 arg.cmpDatumFunc = &ginstate->compareFn[attnum - 1];
551 arg.collation = ginstate->supportCollation[attnum - 1];
552 arg.haveDups = false;
553 qsort_arg(keydata, *nentries, sizeof(keyEntryData),
554 cmpEntries, (void *) &arg);
555
556 if (arg.haveDups)
557 {
558 /* there are duplicates, must get rid of 'em */
559 int32 j;
560
561 entries[0] = keydata[0].datum;
562 nullFlags[0] = keydata[0].isnull;
563 j = 1;
564 for (i = 1; i < *nentries; i++)
565 {
566 if (cmpEntries(&keydata[i - 1], &keydata[i], &arg) != 0)
567 {
568 entries[j] = keydata[i].datum;
569 nullFlags[j] = keydata[i].isnull;
570 j++;
571 }
572 }
573 *nentries = j;
574 }
575 else
576 {
577 /* easy, no duplicates */
578 for (i = 0; i < *nentries; i++)
579 {
580 entries[i] = keydata[i].datum;
581 nullFlags[i] = keydata[i].isnull;
582 }
583 }
584
585 pfree(keydata);
586 }
587
588 return entries;
589 }
590
591 bytea *
ginoptions(Datum reloptions,bool validate)592 ginoptions(Datum reloptions, bool validate)
593 {
594 relopt_value *options;
595 GinOptions *rdopts;
596 int numoptions;
597 static const relopt_parse_elt tab[] = {
598 {"fastupdate", RELOPT_TYPE_BOOL, offsetof(GinOptions, useFastUpdate)},
599 {"gin_pending_list_limit", RELOPT_TYPE_INT, offsetof(GinOptions,
600 pendingListCleanupSize)}
601 };
602
603 options = parseRelOptions(reloptions, validate, RELOPT_KIND_GIN,
604 &numoptions);
605
606 /* if none set, we're done */
607 if (numoptions == 0)
608 return NULL;
609
610 rdopts = allocateReloptStruct(sizeof(GinOptions), options, numoptions);
611
612 fillRelOptions((void *) rdopts, sizeof(GinOptions), options, numoptions,
613 validate, tab, lengthof(tab));
614
615 pfree(options);
616
617 return (bytea *) rdopts;
618 }
619
620 /*
621 * Fetch index's statistical data into *stats
622 *
623 * Note: in the result, nPendingPages can be trusted to be up-to-date,
624 * as can ginVersion; but the other fields are as of the last VACUUM.
625 */
626 void
ginGetStats(Relation index,GinStatsData * stats)627 ginGetStats(Relation index, GinStatsData *stats)
628 {
629 Buffer metabuffer;
630 Page metapage;
631 GinMetaPageData *metadata;
632
633 metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
634 LockBuffer(metabuffer, GIN_SHARE);
635 metapage = BufferGetPage(metabuffer);
636 metadata = GinPageGetMeta(metapage);
637
638 stats->nPendingPages = metadata->nPendingPages;
639 stats->nTotalPages = metadata->nTotalPages;
640 stats->nEntryPages = metadata->nEntryPages;
641 stats->nDataPages = metadata->nDataPages;
642 stats->nEntries = metadata->nEntries;
643 stats->ginVersion = metadata->ginVersion;
644
645 UnlockReleaseBuffer(metabuffer);
646 }
647
648 /*
649 * Write the given statistics to the index's metapage
650 *
651 * Note: nPendingPages and ginVersion are *not* copied over
652 */
653 void
ginUpdateStats(Relation index,const GinStatsData * stats)654 ginUpdateStats(Relation index, const GinStatsData *stats)
655 {
656 Buffer metabuffer;
657 Page metapage;
658 GinMetaPageData *metadata;
659
660 metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
661 LockBuffer(metabuffer, GIN_EXCLUSIVE);
662 metapage = BufferGetPage(metabuffer);
663 metadata = GinPageGetMeta(metapage);
664
665 START_CRIT_SECTION();
666
667 metadata->nTotalPages = stats->nTotalPages;
668 metadata->nEntryPages = stats->nEntryPages;
669 metadata->nDataPages = stats->nDataPages;
670 metadata->nEntries = stats->nEntries;
671
672 MarkBufferDirty(metabuffer);
673
674 if (RelationNeedsWAL(index))
675 {
676 XLogRecPtr recptr;
677 ginxlogUpdateMeta data;
678
679 data.node = index->rd_node;
680 data.ntuples = 0;
681 data.newRightlink = data.prevTail = InvalidBlockNumber;
682 memcpy(&data.metadata, metadata, sizeof(GinMetaPageData));
683
684 XLogBeginInsert();
685 XLogRegisterData((char *) &data, sizeof(ginxlogUpdateMeta));
686 XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT);
687
688 recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE);
689 PageSetLSN(metapage, recptr);
690 }
691
692 UnlockReleaseBuffer(metabuffer);
693
694 END_CRIT_SECTION();
695 }
696