1 /*
2  * Ctable search routines
3  *
4  * $Id$
5  *
6  */
7 
8 #include "ctable.h"
9 
10 #include "ctable_qsort.c"
11 
12 #include "boyer_moore.c"
13 
14 #include "jsw_rand.c"
15 
16 #include "ctable_lists.c"
17 
18 #include "ctable_batch.c"
19 
20 #include "jsw_slib.c"
21 
22 #include "speedtableHash.c"
23 
24 #include <time.h>
25 
26 // forward references
27 CTABLE_INTERNAL struct cursor *
28 ctable_CreateEmptyCursor(Tcl_Interp *interp, CTable *ctable, char *name);
29 CTABLE_INTERNAL struct cursor *
30 ctable_CreateCursor(Tcl_Interp *interp, CTable *ctable, CTableSearch *search);
31 CTABLE_INTERNAL int
32 ctable_CreateCursorCommand(Tcl_Interp *interp, struct cursor *cursor);
33 CTABLE_INTERNAL Tcl_Obj *
34 ctable_CursorToName(struct cursor *cursor);
35 
36 //#define INDEXDEBUG
37 // #define MEGADEBUG
38 // #define SEARCHDEBUG
39 #ifndef TRACKFIELD
40 #define TRACKFIELD 1
41 #endif
42 // debugging routines - verify that every skiplist contains an entry for
43 // every row.
44 CTABLE_INTERNAL void
ctable_verifyField(CTable * ctable,int field,int verbose)45 ctable_verifyField(CTable *ctable, int field, int verbose)
46 {
47     ctable_BaseRow *row = NULL;
48     ctable_BaseRow *found = NULL;
49     ctable_BaseRow *walk = NULL;
50     jsw_skip_t     *skip = ctable->skipLists[field];
51     int             index = ctable->creator->fields[field]->indexNumber;
52 
53     if(verbose) fprintf(stderr, "Verifying field %s\n", ctable->creator->fields[field]->name);
54     CTABLE_LIST_FOREACH (ctable->ll_head, row, 0) {
55 	if (verbose) fprintf(stderr, "  Searching 0x%lx for 0x%lx\n", (long)skip, (long)row);
56 	// make sure the entry can be found.
57 	jsw_sreset(skip);
58 	jsw_sfind (skip, row);
59 	found = jsw_srow(skip);
60 	if(found != row) {
61 	    int count = 0;
62 	    if(!verbose) {
63 		fprintf(stderr, "Verifying field %s\n", ctable->creator->fields[field]->name);
64 		fprintf(stderr, "  Searching skip 0x%lx for row 0x%lx\n", (long)skip, (long)row);
65 	    }
66 	    fprintf(stderr, "    Walking from 0x%lx\n", (long)found);
67             // walk walkRow through the linked list of rows off this skip list
68             CTABLE_LIST_FOREACH (found, walk, index) {
69 		fprintf(stderr, "      ... 0x%lx\n", (long)walk);
70 		count++;
71 	        if(row == walk)
72 		    break;
73 	    }
74 	    if(row != walk)
75 	        Tcl_Panic("row 0x%lx not found in table", (long)row);
76 	    fprintf(stderr, "    Found after %d links\n", count);
77 	}
78     }
79     if(verbose) fprintf(stderr, "Field %s OK\n", ctable->creator->fields[field]->name);
80 }
81 
82 CTABLE_INTERNAL void
ctable_verify(Tcl_Interp * interp,CTable * ctable,int verbose)83 ctable_verify (Tcl_Interp *interp, CTable *ctable, int verbose) {
84     int field;
85 
86     if(verbose) fprintf(stderr, "Verify start.\n");
87     for(field = 0; field < ctable->creator->nFields; field++) {
88 	if(ctable->creator->fields[field]->canBeNull == 0) {
89 	    if(ctable->skipLists[field]) {
90 	        ctable_verifyField(ctable, field, verbose);
91 	    } else if(verbose) {
92 		fprintf(stderr, "No index for field %s\n", ctable->creator->fields[field]->name);
93 	    }
94 	} else if(verbose) {
95 	    fprintf(stderr, "Skipping nullable field %s\n", ctable->creator->fields[field]->name);
96 	}
97     }
98     if(verbose) fprintf(stderr, "Verify end.\n");
99 }
100 
101 
102 /*
103  * ctable_ParseFieldList - given a Tcl list object and a pointer to an array
104  * of integer field numbers and a pointer to an integer for field counts,
105  * install the field count into the field count and allocate an array of
106  * integers for the corresponding field indexes and fill that array with the
107  * field numbers corresponding to the field names in the list.
108  *
109  * It is up to the caller to free the memory pointed to through the
110  * fieldList argument.
111  *
112  * return TCL_OK if all went according to plan, else TCL_ERROR.
113  *
114  */
115 CTABLE_INTERNAL int
ctable_ParseFieldList(Tcl_Interp * interp,Tcl_Obj * fieldListObj,CONST char ** fieldNames,int ** fieldListPtr,int * fieldCountPtr)116 ctable_ParseFieldList (Tcl_Interp *interp, Tcl_Obj *fieldListObj, CONST char **fieldNames, int **fieldListPtr, int *fieldCountPtr) {
117     int             nFields;
118     Tcl_Obj       **fieldsObjv;
119     int             i;
120     int            *fieldList;
121 
122     // the fields they want us to retrieve
123     if (Tcl_ListObjGetElements (interp, fieldListObj, &nFields, &fieldsObjv) == TCL_ERROR) {
124       return TCL_ERROR;
125     }
126 
127     *fieldCountPtr = nFields;
128     *fieldListPtr = fieldList = (int *)ckalloc (sizeof (int) * nFields);
129 
130     for (i = 0; i < nFields; i++) {
131 	if (Tcl_GetIndexFromObj (interp, fieldsObjv[i], fieldNames, "field", TCL_EXACT, &fieldList[i]) != TCL_OK) {
132 	    ckfree ((char *)fieldList);
133 	    *fieldListPtr = NULL;
134 	    return TCL_ERROR;
135 	  }
136     }
137     return TCL_OK;
138 }
139 
140 //
141 // ctable_ParseSortFieldList - given a Tcl list object, and a pointer to a
142 // ctable sort structure, store the number of fields in the list in the
143 // sort structure's field count.  allocate an array of integers for the
144 // field numbers and directions and store them into the sort structure passed.
145 //
146 // Strip the prepending dash of each field, if present, and do the lookup
147 // and store the field number in the corresponding field number array.
148 //
149 // If the dash was present set the corresponding direction in the direction
150 // array to -1 else set it to 1.
151 //
152 // It is up to the caller to free the memory pointed to through the
153 // fieldList argument.
154 //
155 // return TCL_OK if all went according to plan, else TCL_ERROR.
156 //
157 //
158 CTABLE_INTERNAL int
ctable_ParseSortFieldList(Tcl_Interp * interp,Tcl_Obj * fieldListObj,CONST char ** fieldNames,CTableSort * sort)159 ctable_ParseSortFieldList (Tcl_Interp *interp, Tcl_Obj *fieldListObj, CONST char **fieldNames, CTableSort *sort) {
160     int             nFields;
161     Tcl_Obj       **fieldsObjv;
162     Tcl_Obj        *fieldNameObj;
163     int             i;
164     char           *fieldName;
165 
166     // the fields they want us to retrieve
167     if (Tcl_ListObjGetElements (interp, fieldListObj, &nFields, &fieldsObjv) == TCL_ERROR) {
168       return TCL_ERROR;
169     }
170 
171     sort->nFields = nFields;
172     sort->fields =  (int *)ckalloc (sizeof (int) * nFields);
173     sort->directions =  (int *)ckalloc (sizeof (int) * nFields);
174 
175     for (i = 0; i < nFields; i++) {
176 	int fieldNameNeedsFreeing = 0;
177 
178         fieldName = Tcl_GetString (fieldsObjv[i]);
179 	if (fieldName[0] == '-') {
180 	    sort->directions[i] = -1;
181 	    fieldName++;
182 	    fieldNameObj = Tcl_NewStringObj (fieldName, -1);
183 	    fieldNameNeedsFreeing = 1;
184 	} else {
185 	    fieldNameObj = fieldsObjv[i];
186 	    sort->directions[i] = 1;
187 	    fieldNameNeedsFreeing = 0;
188 	}
189 
190 	if (Tcl_GetIndexFromObj (interp, fieldNameObj, fieldNames, "field", TCL_EXACT, &sort->fields[i]) != TCL_OK) {
191 	    if (fieldNameNeedsFreeing) {
192 		Tcl_DecrRefCount (fieldNameObj);
193 	    }
194 	    ckfree ((char *)sort->fields);
195 	    ckfree ((char *)sort->directions);
196 	    sort->fields = NULL;
197 	    sort->directions = NULL;
198 	    return TCL_ERROR;
199 	}
200 
201 	if (fieldNameNeedsFreeing) {
202 	    Tcl_DecrRefCount (fieldNameObj);
203 	}
204     }
205     return TCL_OK;
206 }
207 
208 //
209 // ctable_searchMatchPatternCheck - examine the match pattern to determine
210 // a strategy for matching the pattern.
211 //
212 // If it's anchored, i.e. doesn't start with an asterisk, that's good to know,
213 // we'll be real fast.
214 //
215 // If it's a pattern, we'll use more full blown pattern matching.
216 //
217 // If it's unanchored, we'll use Boyer-Moore to go as fast as we can.
218 //
219 // There are oppportunities for optimization here, check out Peter's
220 // speedtable query optimizer for optimizations the determination of
221 // which can be made here instead.  Shrug.
222 //
223 CTABLE_INTERNAL int
ctable_searchMatchPatternCheck(char * s)224 ctable_searchMatchPatternCheck (char *s) {
225     char c;
226 if(!s) Tcl_Panic("ctable_searchMatchPatternCheck called with null");
227 
228     int firstCharIsStar = 0;
229     int lastCharIsStar = 0;
230 
231     if (*s == '\0') {
232 	return CTABLE_STRING_MATCH_ANCHORED;
233     }
234 
235     if (*s++ == '*') {
236 	firstCharIsStar = 1;
237     }
238 
239     while ((c = *s++) != '\0') {
240 	switch (c) {
241 	  case '*':
242 	    if (*s == '\0') {
243 		lastCharIsStar = 1;
244 	    } else {
245 		// some other * in the middle, too fancy
246 		return CTABLE_STRING_MATCH_PATTERN;
247 	    }
248 	    break;
249 
250 	  case '?':
251 	  case '[':
252 	  case ']':
253 	  case '\\':
254 	    return CTABLE_STRING_MATCH_PATTERN;
255 
256 	}
257     }
258 
259     if (firstCharIsStar) {
260 	if (lastCharIsStar) {
261 	    return CTABLE_STRING_MATCH_UNANCHORED;
262 	}
263 	// we could do a reverse anchored search here but i don't
264 	// think this comes up often enough (*pattern) to warrant it
265 	return CTABLE_STRING_MATCH_PATTERN;
266     }
267 
268     if (lastCharIsStar) {
269 	// first char is not star, last char is
270 	return CTABLE_STRING_MATCH_ANCHORED;
271     }
272 
273     // first char is not star, last char is not star, and there are
274     // no other metacharacters in the string. This is bad because they
275     // should be using "=" not "match" but we'll use pattern because
276     // that will actually do the right thing.  alternatively we could
277     // add another string match pattern type
278     return CTABLE_STRING_MATCH_PATTERN;
279 }
280 
ctable_CreateInRows(Tcl_Interp * interp,CTable * ctable,CTableSearchComponent * component)281 CTABLE_INTERNAL int ctable_CreateInRows(Tcl_Interp *interp, CTable *ctable, CTableSearchComponent *component)
282 {
283     int i;
284 
285     if(component->inListRows || component->inCount == 0)
286 	return TCL_OK;
287 
288     component->inListRows = (ctable_BaseRow **)ckalloc(component->inCount * sizeof (ctable_BaseRow *));
289 
290     // Since the main loop may abort, make sure this is clean
291     for(i = 0; i < component->inCount; i++) {
292 	component->inListRows[i] = NULL;
293     }
294 
295     for(i = 0; i < component->inCount; i++) {
296 	component->inListRows[i] = (*ctable->creator->make_empty_row) (ctable);
297 
298 	if ((*ctable->creator->set) (interp, ctable, component->inListObj[i], component->inListRows[i], component->fieldID, CTABLE_INDEX_PRIVATE) == TCL_ERROR) {
299 	    Tcl_AddErrorInfo (interp, "\n    while processing \"in\" compare function");
300 	    return TCL_ERROR;
301 	}
302     }
303     return TCL_OK;
304 }
305 
ctable_FreeInRows(CTable * ctable,CTableSearchComponent * component)306 CTABLE_INTERNAL void ctable_FreeInRows(CTable *ctable, CTableSearchComponent *component)
307 {
308     if(component->inListRows) {
309 	int i;
310 	for(i = 0; i < component->inCount; i++) {
311 	    if(component->inListRows[i]) {
312 	        ctable->creator->delete_row (ctable, component->inListRows[i], CTABLE_INDEX_PRIVATE);
313 	    }
314 	}
315 	ckfree((char*)component->inListRows);
316 	component->inListRows = NULL;
317     }
318 }
319 
320 static int
ctable_ParseSearch(Tcl_Interp * interp,CTable * ctable,Tcl_Obj * componentListObj,CONST char ** fieldNames,CTableSearch * search)321 ctable_ParseSearch (Tcl_Interp *interp, CTable *ctable, Tcl_Obj *componentListObj, CONST char **fieldNames, CTableSearch *search) {
322     Tcl_Obj    **componentList;
323     int          componentIdx;
324     int          componentListCount;
325 
326     Tcl_Obj    **termList;
327     int          termListCount;
328 
329     int          field;
330 
331     CTableSearchComponent  *components;
332     CTableSearchComponent  *component;
333 
334     static CONST char *searchTerms[] = CTABLE_SEARCH_TERMS;
335 
336     if (Tcl_ListObjGetElements (interp, componentListObj, &componentListCount, &componentList) == TCL_ERROR) {
337         return TCL_ERROR;
338     }
339 
340     if (componentListCount == 0) {
341         search->components = NULL;
342 	return TCL_OK;
343     }
344 
345     search->nComponents = componentListCount;
346 
347     components = (CTableSearchComponent *)ckalloc (componentListCount * sizeof (CTableSearchComponent));
348 
349     search->components = components;
350 
351     for (componentIdx = 0; componentIdx < componentListCount; componentIdx++) {
352         int term;
353 
354 	if (Tcl_ListObjGetElements (interp, componentList[componentIdx], &termListCount, &termList) == TCL_ERROR) {
355 	  err:
356 	    ckfree ((char *)components);
357 	    search->components = NULL;
358 	    return TCL_ERROR;
359 	}
360 
361 	if (termListCount < 2) {
362 	    // would be cool to support regexps here too
363 	    Tcl_WrongNumArgs (interp, 0, termList, "term field ?value..?");
364 	    goto err;
365 	}
366 
367 	if (Tcl_GetIndexFromObj (interp, termList[0], searchTerms, "term", TCL_EXACT, &term) != TCL_OK) {
368 	    goto err;
369 	}
370 
371 	if (Tcl_GetIndexFromObj (interp, termList[1], fieldNames, "field", TCL_EXACT, &field) != TCL_OK) {
372 	    goto err;
373 	}
374 
375 	component = &components[componentIdx];
376 
377 	component->comparisonType = term;
378 	component->fieldID = field;
379 	component->clientData = NULL;
380 	component->row1 = NULL;
381 	component->row2 = NULL;
382 	component->row3 = NULL;
383 	component->inListObj = NULL;
384 	component->inListRows = NULL;
385 	component->inCount = 0;
386 	component->compareFunction = ctable->creator->fields[field]->compareFunction;
387 
388 	if (term == CTABLE_COMP_FALSE || term == CTABLE_COMP_TRUE || term == CTABLE_COMP_NULL || term == CTABLE_COMP_NOTNULL) {
389 	    if (termListCount != 2) {
390 		Tcl_AppendResult (interp, "false, true, null and notnull search expressions must have only two fields", (char *) NULL);
391 		goto err;
392 	    }
393 	} else {
394 
395 	    if (term == CTABLE_COMP_IN) {
396 	        if (termListCount != 3) {
397 		    Tcl_AppendResult (interp, "term \"", Tcl_GetString (termList[0]), "\" require 3 arguments (term, field, list)", (char *) NULL);
398 		    goto err;
399 		}
400 
401 		if (Tcl_ListObjGetElements (interp, termList[2], &component->inCount, &component->inListObj) == TCL_ERROR) {
402 		    goto err;
403 		}
404 
405 	    } else if (term == CTABLE_COMP_RANGE) {
406 	        ctable_BaseRow *row;
407 
408 	        if (termListCount != 4) {
409 		    Tcl_AppendResult (interp, "term \"", Tcl_GetString (termList[0]), "\" require 4 arguments (term, field, lowValue, highValue)", (char *) NULL);
410 		    goto err;
411 		}
412 
413 		row = (*ctable->creator->make_empty_row) (ctable);
414 		if ((*ctable->creator->set) (interp, ctable, termList[2], row, field, CTABLE_INDEX_PRIVATE) == TCL_ERROR) {
415 		    goto err;
416 		}
417 		component->row1 = row;
418 
419 		row = (*ctable->creator->make_empty_row) (ctable);
420 		if ((*ctable->creator->set) (interp, ctable, termList[3], row, field, CTABLE_INDEX_PRIVATE) == TCL_ERROR) {
421 		    goto err;
422 		}
423 		component->row2 = row;
424 
425 		continue;
426 
427 	    } else if (termListCount != 3) {
428 		Tcl_AppendResult (interp, "term \"", Tcl_GetString (termList[0]), "\" require 3 arguments (term, field, value)", (char *) NULL);
429 		goto err;
430 	    }
431 
432 	    if ((term == CTABLE_COMP_MATCH) || (term == CTABLE_COMP_NOTMATCH) || (term == CTABLE_COMP_MATCH_CASE) || (term == CTABLE_COMP_NOTMATCH_CASE)) {
433 		// Check if field that supports string matches
434 		int ftype = ctable->creator->fieldTypes[field];
435 		if(ftype != CTABLE_TYPE_FIXEDSTRING && ftype != CTABLE_TYPE_VARSTRING && ftype != CTABLE_TYPE_KEY) {
436 		    Tcl_AppendResult (interp, "term \"", Tcl_GetString (termList[1]), "\" must be a string type for \"", Tcl_GetString (termList[0]), "\" operation", (char *) NULL);
437 		    goto err;
438 		}
439 
440 		struct ctableSearchMatchStruct *sm = (struct ctableSearchMatchStruct *)ckalloc (sizeof (struct ctableSearchMatchStruct));
441 
442 		sm->type = ctable_searchMatchPatternCheck (Tcl_GetString (termList[2]));
443 		sm->nocase = ((term == CTABLE_COMP_MATCH) || (term == CTABLE_COMP_NOTMATCH));
444 
445 		if (sm->type == CTABLE_STRING_MATCH_UNANCHORED) {
446 		    char *needle;
447 		    int len;
448 
449 		    needle = Tcl_GetStringFromObj (termList[2], &len);
450 		    boyer_moore_setup (sm, (unsigned char *)needle + 1, len - 2, sm->nocase);
451 		} else if(sm->type == CTABLE_STRING_MATCH_ANCHORED && term == CTABLE_COMP_MATCH_CASE) {
452 		    int len;
453 		    char *needle = Tcl_GetStringFromObj (termList[2], &len);
454 		    char *prefix = (char *) ckalloc(len+1);
455 		    int i;
456 
457 		    /* stash the prefix of the match into row2 */
458 		    for(i = 0; i < len; i++) {
459 			if(needle[i] == '*') {
460 			    break;
461 			}
462 			prefix[i] = needle[i];
463 			break;
464 		    }
465 
466 		    // This test should never fail.
467 		    if(i > 0) {
468 		        ctable_BaseRow *row;
469 		        prefix[i] = '\0';
470 
471 		        row = (*ctable->creator->make_empty_row) (ctable);
472 	    		if ((*ctable->creator->set) (interp, ctable, Tcl_NewStringObj (prefix, -1), row, field, CTABLE_INDEX_PRIVATE) == TCL_ERROR) {
473 			    goto err;
474 	    	        }
475 	    		component->row2 = row;
476 
477 			// Now set up row3 as the first non-matching string
478 			prefix[i-1]++;
479 
480 		        row = (*ctable->creator->make_empty_row) (ctable);
481 	    		if ((*ctable->creator->set) (interp, ctable, Tcl_NewStringObj (prefix, -1), row, field, CTABLE_INDEX_PRIVATE) == TCL_ERROR) {
482 			    goto err;
483 	    	        }
484 	    		component->row3 = row;
485 
486 		    }
487 		    ckfree(prefix);
488 		}
489 
490 		component->clientData = sm;
491 	    }
492 
493 	    /* stash what we want to compare to into a row as in "range"
494 	     */
495 	    if (term != CTABLE_COMP_IN) {
496 		ctable_BaseRow *row;
497 		row = (*ctable->creator->make_empty_row) (ctable);
498 		if ((*ctable->creator->set) (interp, ctable, termList[2], row, field, CTABLE_INDEX_PRIVATE) == TCL_ERROR) {
499 			goto err;
500 		}
501 		component->row1 = row;
502 	    }
503 	}
504     }
505 
506     // it worked, leave the components allocated
507     return TCL_OK;
508 }
509 
510 static int
ctable_ParseFilters(Tcl_Interp * interp,CTable * ctable,Tcl_Obj * filterListObj,CTableSearch * search)511 ctable_ParseFilters (Tcl_Interp *interp, CTable *ctable, Tcl_Obj *filterListObj, CTableSearch *search) {
512     Tcl_Obj               **filterList;
513     int                     filterListCount;
514 
515     CTableSearchFilter     *filters = NULL;
516 
517     int i;
518 
519     // Assume nothing, trust no one.
520     search->filters = NULL;
521     search->nFilters = 0;
522 
523     if (Tcl_ListObjGetElements (interp, filterListObj, &filterListCount, &filterList) == TCL_ERROR)
524         goto abend;
525 
526     // Nothing to do
527     if (filterListCount == 0) {
528         return TCL_OK;
529     }
530 
531 
532     // List is {{filtername filtervalue} {filtername filtervalue}} .. if passing a list to a filter, pass a list
533 
534     // make room, make room
535     filters = (CTableSearchFilter *)ckalloc (filterListCount * sizeof (CTableSearchFilter));
536 
537     // step through list, looking for filter names and filling in the structs
538     for (i = 0; i < filterListCount; i++) {
539         Tcl_Obj    **termList;
540         int          termListCount;
541         int item;
542 
543         if (Tcl_ListObjGetElements (interp, filterList[i], &termListCount, &termList) == TCL_ERROR || termListCount != 2) {
544           Tcl_AppendResult (interp, "each term of the filter must be a nested list with 2 items", (char *) NULL);
545           goto abend;
546         }
547 
548 
549         if (Tcl_GetIndexFromObj (interp, termList[0], ctable->creator->filterNames, "filter", TCL_EXACT, &item) != TCL_OK)
550             goto abend;
551 
552         filters[i].filterFunction = ctable->creator->filterFunctions[item];
553         filters[i].filterObject = termList[1];
554     }
555 
556     // Register the filter list we've created
557     search->filters = filters;
558     search->nFilters = filterListCount;
559     return TCL_OK;
560 
561 abend:
562     if(filters)
563         ckfree ((char *)filters);
564     return TCL_ERROR;
565 }
566 
567 //
568 // ctable_SearchAction - Perform the search action on a row that's matched
569 //  the search criteria.
570 //
571 static int
ctable_SearchAction(Tcl_Interp * interp,CTable * ctable,CTableSearch * search,ctable_BaseRow * row)572 ctable_SearchAction (Tcl_Interp *interp, CTable *ctable, CTableSearch *search, ctable_BaseRow *row) {
573     char           *key;
574     ctable_CreatorTable *creator = ctable->creator;
575 
576     key = row->hashEntry.key;
577 
578     if (search->action == CTABLE_SEARCH_ACTION_WRITE_TABSEP) {
579 	Tcl_DString     dString;
580 
581 	Tcl_DStringInit (&dString);
582 
583 	// string-append the specified fields, or all fields, tab separated
584 
585 	if (search->nRetrieveFields < 0) {
586 	    (*creator->dstring_append_get_tabsep) (key, row, creator->publicFieldList, creator->nPublicFields, &dString, search->noKeys, search->sepstr, search->quoteType, search->nullString);
587 	} else {
588 	    (*creator->dstring_append_get_tabsep) (key, row, search->retrieveFields, search->nRetrieveFields, &dString, search->noKeys, search->sepstr, search->quoteType, search->nullString);
589 	}
590 
591 	// write the line out
592 
593 	if (Tcl_WriteChars (search->tabsepChannel, Tcl_DStringValue (&dString), Tcl_DStringLength (&dString)) < 0) {
594 	    return TCL_ERROR;
595 	}
596 
597 	Tcl_DStringFree (&dString);
598 	return TCL_OK;
599     }
600 
601     // if there's a code body to eval...
602 
603     if (search->codeBody != NULL) {
604 	Tcl_Obj *listObj = NULL;
605 	int      evalResult;
606 
607 	// generate the list of requested fields, or all fields, in
608 	// "get style" (value only), "array get style" (key-value
609 	// pairs with nulls suppressed) and "array get with nulls"
610 	// style, (all requested fields, null or not)
611 
612 	switch (search->action) {
613 
614 	  case CTABLE_SEARCH_ACTION_GET: {
615 	    if (search->nRetrieveFields < 0) {
616 		listObj = creator->gen_list (interp, row);
617 	    } else {
618 	       int i;
619 
620 	       listObj = Tcl_NewObj ();
621 	       for (i = 0; i < search->nRetrieveFields; i++) {
622 		   creator->lappend_field (interp, listObj, row, search->retrieveFields[i]);
623 	       }
624 	    }
625 	    break;
626 	  }
627 
628 	  case CTABLE_SEARCH_ACTION_ARRAY_WITH_NULLS:
629 	  case CTABLE_SEARCH_ACTION_ARRAY: {
630 	    int result = TCL_OK;
631 
632 	    // Clear array before filling it in. Ignore failure because it's
633 	    // OK for the array not to exist at this point.
634 	    Tcl_UnsetVar2(interp, Tcl_GetString(search->rowVarNameObj), NULL, 0);
635 
636 	    if (search->nRetrieveFields < 0) {
637 	       int i;
638 
639 	       for (i = 0; i < creator->nFields; i++) {
640 		   if (is_hidden_field(creator,i) && !is_key_field(creator,i,search->noKeys)) {
641 		       continue;
642 		   }
643 	           if (search->action == CTABLE_SEARCH_ACTION_ARRAY) {
644 		       result = creator->array_set (interp, search->rowVarNameObj, row, i);
645 		   } else {
646 		       result = creator->array_set_with_nulls (interp, search->rowVarNameObj, row, i);
647 		   }
648 	       }
649 	    } else {
650 	       int i;
651 
652 	       for (i = 0; i < search->nRetrieveFields; i++) {
653 	           if (search->action == CTABLE_SEARCH_ACTION_ARRAY) {
654 		       result = creator->array_set (interp, search->rowVarNameObj, row, search->retrieveFields[i]);
655 		   } else {
656 		       result = creator->array_set_with_nulls (interp, search->rowVarNameObj, row, search->retrieveFields[i]);
657 		   }
658 	       }
659 	    }
660 
661 	    if (result != TCL_OK) {
662 	        return result;
663 	    }
664 	    break;
665 	  }
666 
667 	  case CTABLE_SEARCH_ACTION_ARRAY_GET: {
668 	    int i;
669 
670 	    listObj = Tcl_NewObj ();
671 	    if (search->nRetrieveFields < 0) {
672 		for (i = 0; i < creator->nFields; i++) {
673 		    if (is_hidden_field(creator,i) && !is_key_field(creator,i,search->noKeys)) {
674 			continue;
675 		    }
676 		    creator->lappend_nonnull_field_and_name (interp, listObj, row, i);
677 		 }
678 	    } else {
679 		for (i = 0; i < search->nRetrieveFields; i++) {
680 		    creator->lappend_nonnull_field_and_name (interp, listObj, row, search->retrieveFields[i]);
681 		}
682 	    }
683 	    break;
684 	  }
685 
686 	  case CTABLE_SEARCH_ACTION_ARRAY_GET_WITH_NULLS: {
687 	    int i;
688 
689 	    listObj = Tcl_NewObj ();
690 	    if (search->nRetrieveFields < 0) {
691 		for (i = 0; i < creator->nFields; i++) {
692 		    if (is_hidden_field(creator,i) && !is_key_field(creator,i,search->noKeys)) {
693 			continue;
694 		    }
695 		    creator->lappend_field_and_name (interp, listObj, row, i);
696 		}
697 	    } else {
698 		listObj = Tcl_NewObj ();
699 		for (i = 0; i < search->nRetrieveFields; i++) {
700 		    creator->lappend_field_and_name (interp, listObj, row, search->retrieveFields[i]);
701 		}
702 	    }
703 	    break;
704 	  }
705 
706 	  default: {
707 	      if (search->keyVarNameObj == NULL) {
708 	          Tcl_Panic ("software failure - unhandled search action");
709 	      }
710 	  }
711 	}
712 
713 	// if the key var is defined, set the key into it
714 	if (search->keyVarNameObj != NULL) {
715 	    if (Tcl_ObjSetVar2 (interp, search->keyVarNameObj, (Tcl_Obj *)NULL, Tcl_NewStringObj (key, -1), TCL_LEAVE_ERR_MSG) == (Tcl_Obj *) NULL) {
716 		return TCL_ERROR;
717 	    }
718 	}
719 
720 	// set the returned list into the value var
721 	if ((listObj != NULL) && (Tcl_ObjSetVar2 (interp, search->rowVarNameObj, (Tcl_Obj *)NULL, listObj, TCL_LEAVE_ERR_MSG) == (Tcl_Obj *) NULL)) {
722 	    return TCL_ERROR;
723 	}
724 
725 	// evaluate the code body
726 	//
727 	// By using a Tcl object for the code body, the code body will be
728         // on-the-fly compiled by Tcl once and cached on subsequent
729 	// evals.  Cool.
730 	//
731 	evalResult = Tcl_EvalObjEx (interp, search->codeBody, 0);
732 	switch (evalResult) {
733 	  case TCL_ERROR:
734 	    Tcl_AddErrorInfo (interp, "\n   while processing search code body");
735 	    return TCL_ERROR;
736 
737 	  case TCL_OK:
738 	  case TCL_CONTINUE:
739 	  case TCL_BREAK:
740 	    Tcl_ResetResult(interp);
741 	  case TCL_RETURN:
742 	    return evalResult;
743 	}
744     }
745 
746     return TCL_OK;
747 }
748 
749 //
750 // ctable_checkForKey - check for a "key" field, and if there set the internal
751 // noKeys flag to suppress the separate output of the "_key" field.
752 //
ctable_checkForKey(CTable * ctable,CTableSearch * search)753 CTABLE_INTERNAL void ctable_checkForKey(CTable *ctable, CTableSearch *search)
754 {
755     if (!search->noKeys) {
756 	int		     i;
757         int                 *fields;
758         int                  nFields;
759         ctable_CreatorTable *creator = ctable->creator;
760 
761         if (search->nRetrieveFields < 0) {
762 	    fields = creator->publicFieldList;
763 	    nFields = creator->nPublicFields;
764         } else {
765 	    nFields = search->nRetrieveFields;
766 	    fields = search->retrieveFields;
767         }
768 
769         for (i = 0; i < nFields; i++) {
770 	    if(fields[i] == creator->keyField) {
771 		search->noKeys = 1;
772 		break;
773 	    }
774 	}
775     }
776 }
777 
778 //
779 // ctable_WriteFieldNames - write field names from a search structure to
780 //   the specified channel, tab-separated
781 //
782 //
783 static int
ctable_WriteFieldNames(Tcl_Interp * interp,CTable * ctable,CTableSearch * search)784 ctable_WriteFieldNames (Tcl_Interp *interp, CTable *ctable, CTableSearch *search)
785 {
786     int i;
787     Tcl_DString          dString;
788     int                 *fields;
789     int                  nFields;
790     ctable_CreatorTable *creator = ctable->creator;
791 
792     Tcl_DStringInit (&dString);
793 
794     if (search->nRetrieveFields < 0) {
795 	fields = creator->publicFieldList;
796 	nFields = creator->nPublicFields;
797     } else {
798 	nFields = search->nRetrieveFields;
799 	fields = search->retrieveFields;
800     }
801 
802     // Generate key field if necessary
803     if (!search->noKeys)
804         Tcl_DStringAppend (&dString, "_key", 4);
805 
806     for (i = 0; i < nFields; i++) {
807 	if (!search->noKeys || i != 0) {
808 	    Tcl_DStringAppend(&dString, search->sepstr, -1);
809 	}
810 
811 	Tcl_DStringAppend(&dString, creator->fields[fields[i]]->name, -1);
812     }
813     Tcl_DStringAppend(&dString, "\n", 1);
814 
815     if (Tcl_WriteChars (search->tabsepChannel, Tcl_DStringValue (&dString), Tcl_DStringLength (&dString)) < 0) {
816 	Tcl_DStringFree (&dString);
817 	return TCL_ERROR;
818     }
819 
820     Tcl_DStringFree (&dString);
821     return TCL_OK;
822 }
823 
824 //
825 // ctable_PerformTransactions - atomic actions taken at the end of a search
826 //
827 static int
ctable_PerformTransaction(Tcl_Interp * interp,CTable * ctable,CTableSearch * search)828 ctable_PerformTransaction (Tcl_Interp *interp, CTable *ctable, CTableSearch *search)
829 {
830     int fieldIndex, rowIndex;
831     ctable_CreatorTable *creator = ctable->creator;
832 
833     if(search->tranType == CTABLE_SEARCH_TRAN_NONE || search->tranType == CTABLE_SEARCH_TRAN_CURSOR) {
834 	return TCL_OK;
835     }
836 
837     if(search->tranType == CTABLE_SEARCH_TRAN_DELETE) {
838 
839       // walk the result and delete the matched rows
840       for (rowIndex = search->offset; rowIndex < search->offsetLimit; rowIndex++) {
841 	  (*creator->delete_row) (ctable, search->tranTable[rowIndex], CTABLE_INDEX_NORMAL);
842 	  ctable->count--;
843       }
844 
845       return TCL_OK;
846 
847     }
848 
849     if(search->tranType == CTABLE_SEARCH_TRAN_UPDATE) {
850 
851 	int objc;
852 	Tcl_Obj **objv;
853 	int *updateFields;
854 
855 	// Convert the tranData field into a list
856         if (Tcl_ListObjGetElements (interp, search->tranData, &objc, &objv) == TCL_ERROR) {
857 updateParseError:
858 	    Tcl_AppendResult (interp, " parsing argument to -update", (char *)NULL);
859 	    return TCL_ERROR;
860 	}
861 
862 	if(objc & 1) {
863 	    Tcl_AppendResult (interp, "-update list must contain an even number of elements", (char *)NULL);
864             return TCL_ERROR;
865         }
866 
867 	// get some space for the field indices
868 	updateFields = (int *)ckalloc (sizeof (int) * (objc/2));
869 
870 	// Convert the names to field indices
871 	for(fieldIndex = 0; fieldIndex < objc/2; fieldIndex++) {
872 	    if (Tcl_GetIndexFromObj (interp, objv[fieldIndex*2], creator->fieldNames, "field", TCL_EXACT, &updateFields[fieldIndex]) != TCL_OK) {
873 	        ckfree((char*)updateFields);
874 		goto updateParseError;
875 	    }
876 	}
877 
878 	// Perform update
879         for (rowIndex = search->offset; rowIndex < search->offsetLimit; rowIndex++) {
880 	    ctable_BaseRow *row = search->tranTable[rowIndex];
881 
882 	    for(fieldIndex = 0; fieldIndex < objc/2; fieldIndex++) {
883 		if((*creator->set)(interp, ctable, objv[fieldIndex*2+1], row, updateFields[fieldIndex], CTABLE_INDEX_NORMAL) == TCL_ERROR) {
884 		    ckfree((char*)updateFields);
885 		    Tcl_AppendResult (interp, " (update may be incomplete)", (char *)NULL);
886 		    return TCL_ERROR;
887 		}
888 	    }
889 	}
890 
891 	ckfree((char*)updateFields);
892 
893         return TCL_OK;
894     }
895 
896     Tcl_AppendResult(interp, "internal error - unknown transaction type %d", search->tranType);
897 
898     return TCL_ERROR;
899 }
900 
901 //
902 // ctable_search_poll - called periodically from a search to avoid blocking
903 // the Tcl event loop.
904 //
ctable_search_poll(Tcl_Interp * interp,CTable * ctable,CTableSearch * search)905 CTABLE_INTERNAL int ctable_search_poll(Tcl_Interp *interp, CTable *ctable, CTableSearch *search)
906 {
907     if(search->pollCodeBody) {
908 	int result = Tcl_EvalObjEx (interp, search->pollCodeBody, 0);
909 	if(result == TCL_ERROR) {
910 	    Tcl_BackgroundError(interp);
911 	    Tcl_ResetResult(interp);
912 	}
913     } else {
914 	Tcl_DoOneEvent(0);
915     }
916     return TCL_OK;
917 }
918 
919 //
920 // ctable_PostSearchCommonActions - actions taken at the end of a search
921 //
922 // If results sorting is required, we sort the results.
923 //
924 // We interpret start and offset, if set, to limit rows returned.
925 //
926 // We walk the sort results, calling ctable_SearchAction on each.
927 //
928 // If transaction processing is required, we call ctable_PerformTransactions
929 //
930 //
931 static int
ctable_PostSearchCommonActions(Tcl_Interp * interp,CTable * ctable,CTableSearch * search)932 ctable_PostSearchCommonActions (Tcl_Interp *interp, CTable *ctable, CTableSearch *search)
933 {
934     int walkIndex;
935     ctable_CreatorTable *creator = ctable->creator;
936     int actionResult = TCL_OK;
937 
938     // if there was nothing matched, or we're before the offset, we're done
939     if(search->matchCount == 0 || search->offset > search->matchCount) {
940 	// And we didn't match anything
941 	search->matchCount = 0;
942 	return TCL_OK;
943     }
944 
945     // if we're not sorting or performing a transaction, we're done -- we
946     // did 'em all on the fly
947     if (search->tranTable == NULL) {
948 	// But account for the offset in the matchCount
949 	search->matchCount -= search->offset;
950         return TCL_OK;
951     }
952 
953     // figure out the last row they could want, if it's more than what's
954     // there, set it down to what came back
955     if ((search->offsetLimit == 0) || (search->offsetLimit > search->matchCount)) {
956         search->offsetLimit = search->matchCount;
957     }
958 
959     if(search->sortControl.nFields) {	// sorting
960       ctable_qsort_r (search->tranTable, search->matchCount, sizeof (ctable_HashEntry *), &search->sortControl, (cmp_t*) creator->sort_compare);
961     }
962 
963     if (search->tranType == CTABLE_SEARCH_TRAN_CURSOR) {
964 	search->cursor = ctable_CreateCursor(interp, ctable, search);
965     } else if(search->bufferResults == CTABLE_BUFFER_DEFER) { // we deferred the operation to here
966         // walk the result
967         for (walkIndex = search->offset; walkIndex < search->offsetLimit; walkIndex++) {
968 	    if(search->pollInterval && --search->nextPoll <= 0) {
969 		if(ctable_search_poll(interp, ctable, search) == TCL_ERROR)
970 		    return TCL_ERROR;
971 		search->nextPoll = search->pollInterval;
972 	    }
973 	    /* here is where we want to take the match actions
974 	     * when we are sorting or otherwise buffering
975 	     */
976 	    actionResult = ctable_SearchAction (interp, ctable, search, search->tranTable[walkIndex]);
977 	    switch (actionResult) {
978 	        case TCL_CONTINUE:
979 	        case TCL_OK: {
980 	            actionResult = TCL_OK;
981 	            break;
982 	        }
983 	        case TCL_BREAK:
984 	        case TCL_RETURN: {
985 		    // We DO count this row as a match, to be consistent with
986 		    // the non-deferred case.
987 		    search->offsetLimit = walkIndex + 1;
988 	            goto normal_return;
989 	        }
990 	        case TCL_ERROR: {
991 	            return TCL_ERROR;
992 	        }
993 	    }
994         }
995     }
996 
997 normal_return:
998     // Calculate the final match count
999     search->matchCount = search->offsetLimit - search->offset;
1000 
1001     // Finally, perform any pending transaction.
1002     if(search->tranType != CTABLE_SEARCH_TRAN_NONE && search->tranType !=  CTABLE_SEARCH_TRAN_CURSOR) {
1003 	if (ctable_PerformTransaction(interp, ctable, search) == TCL_ERROR) {
1004 	    return TCL_ERROR;
1005 	}
1006     }
1007 
1008     return actionResult;
1009 }
1010 
1011 //
1012 // ctable_SearchCompareRow - perform comparisons on a row
1013 //
1014 INLINE static int
ctable_SearchCompareRow(Tcl_Interp * interp,CTable * ctable,CTableSearch * search,ctable_BaseRow * row)1015 ctable_SearchCompareRow (Tcl_Interp *interp, CTable *ctable, CTableSearch *search, ctable_BaseRow *row)
1016 {
1017     int   compareResult;
1018     int   actionResult;
1019 
1020     // Handle polling
1021     if(search->pollInterval && --search->nextPoll <= 0) {
1022 	if(ctable_search_poll(interp, ctable, search) == TCL_ERROR)
1023 	    return TCL_ERROR;
1024 	search->nextPoll = search->pollInterval;
1025     }
1026 
1027     // if we have a match pattern (for the key) and it doesn't match,
1028     // skip this row
1029 
1030     if (search->pattern != (char *) NULL) {
1031 	if (!Tcl_StringCaseMatch (row->hashEntry.key, search->pattern, 1)) {
1032 	    return TCL_CONTINUE;
1033 	}
1034     }
1035 
1036     // check filters
1037     if (search->nFilters) {
1038 	int i;
1039 	int filterResult = TCL_OK;
1040 
1041 	for(i = 0; i < search->nFilters; i++) {
1042 	    filterFunction_t f = search->filters[i].filterFunction;
1043 	    filterResult = (*f) (interp, ctable, row, search->filters[i].filterObject, search->sequence);
1044 	    if(filterResult != TCL_OK)
1045 		return filterResult;
1046 	}
1047     }
1048 
1049     //
1050     // run the supplied compare routine
1051     //
1052     compareResult = (*ctable->creator->search_compare) (interp, search, row);
1053     if (compareResult == TCL_CONTINUE) {
1054 	return TCL_CONTINUE;
1055     }
1056 
1057     if (compareResult == TCL_ERROR) {
1058 	return TCL_ERROR;
1059     }
1060 
1061     // It's a Match
1062     // Are we sorting? Plop the match in the sort table and return
1063     // Increment count in this block to make sure it's incremented in all
1064     // paths.
1065 
1066     if (search->tranTable == NULL) {
1067 	++search->matchCount;
1068     } else {
1069 	/* We are buffering the results (eg, for a sort or a transaction)
1070 	 * so just return, we'll do the heavy lifting later. */
1071 	assert (search->matchCount <= ctable->count);
1072 	search->tranTable[search->matchCount++] = row;
1073 
1074 	// If buffering for an important reason (eg, sorting), defer
1075 	if(search->bufferResults == CTABLE_BUFFER_DEFER)
1076 	    return TCL_CONTINUE;
1077     }
1078 
1079     // We're not deferring the results, let's figure out what to do as we
1080     // match. If we haven't met the start point, blow it off.
1081     if (search->matchCount <= search->offset) {
1082 	return TCL_CONTINUE;
1083     }
1084 
1085     if (search->action == CTABLE_SEARCH_ACTION_NONE) {
1086 	// we're only counting -- if there is a limit and it's been
1087 	// met, we're done
1088 	if ((search->limit != 0) && (search->matchCount >= search->offsetLimit)) {
1089 	    return TCL_BREAK;
1090 	}
1091 
1092 	// the limit hasn't been exceeded or there isn't one,
1093 	// so we keep counting -- but we continue here because
1094 	// we don't need to do any processing on the line
1095 	return TCL_CONTINUE;
1096     }
1097 
1098     /* we want to take the match actions here -- we're here when we aren't
1099      * buffering (at least not for an "important" reason)
1100      */
1101     actionResult = ctable_SearchAction (interp, ctable, search, row);
1102     if (actionResult == TCL_ERROR) {
1103 	return TCL_ERROR;
1104     }
1105 
1106     if ((actionResult == TCL_CONTINUE) || (actionResult == TCL_OK)) {
1107 	// if there was a limit and we've met it, we're done
1108 	if ((search->limit != 0) && (search->matchCount >= search->offsetLimit)) {
1109 	    return TCL_BREAK;
1110 	}
1111 	return TCL_CONTINUE;
1112     }
1113 
1114     if ((actionResult == TCL_BREAK) || (actionResult == TCL_RETURN)) {
1115 	return actionResult;
1116     }
1117 
1118     Tcl_Panic("software failure - unhandled SearchAction return");
1119     return TCL_ERROR;
1120 }
1121 
1122 enum skipStart_e {
1123     SKIP_START_NONE, SKIP_START_GE_ROW1, SKIP_START_GT_ROW1, SKIP_START_EQ_ROW1, SKIP_START_RESET
1124 };
1125 enum skipEnd_e {
1126     SKIP_END_NONE, SKIP_END_NE_ROW1, SKIP_END_GE_ROW1, SKIP_END_GT_ROW1, SKIP_END_GE_ROW2
1127 };
1128 enum skipNext_e {
1129     SKIP_NEXT_NONE, SKIP_NEXT_ROW, SKIP_NEXT_MATCH, SKIP_NEXT_IN_LIST
1130 };
1131 
1132 // skiplist scanning rules table.
1133 //   scan starts at skipStart, goes to skipEnd, and uses skipNext to traverse.
1134 //   score is based on an estimate of how much a scan on this field is likely
1135 //   to limit the search.
1136 // rationale:
1137 //   <, <=, or >= are 2 points for being constrained at one end
1138 //   > is only 1 point because we need to loop over "=" at start
1139 //   range is 4 points for being constrained at both ends
1140 //   = is 6 points for being a "minimum range"
1141 //   in is 100 points (and win) because it HAS to be done over a list
1142 // plus one point for being the sort field, because that saves creating and
1143 //   sorting the transaction table.
1144 //
1145 static struct {
1146     enum skipStart_e	skipStart;
1147     enum skipEnd_e	skipEnd;
1148     enum skipNext_e	skipNext;
1149     int			score;
1150 } skipTypes[] = {
1151   {SKIP_START_NONE,	SKIP_END_NONE,	  SKIP_NEXT_NONE, -2 }, // FALSE
1152   {SKIP_START_NONE,	SKIP_END_NONE,	  SKIP_NEXT_NONE, -2 }, // TRUE
1153   {SKIP_START_NONE,	SKIP_END_NONE,	  SKIP_NEXT_NONE, -2 }, // NULL
1154   {SKIP_START_NONE,	SKIP_END_NONE,	  SKIP_NEXT_NONE, -2 }, // NOTNULL
1155   {SKIP_START_RESET,	SKIP_END_GE_ROW1, SKIP_NEXT_ROW,   2 }, // LT
1156   {SKIP_START_RESET,	SKIP_END_GT_ROW1, SKIP_NEXT_ROW,   2 }, // LE
1157   {SKIP_START_EQ_ROW1,	SKIP_END_GT_ROW1, SKIP_NEXT_ROW,   6 }, // EQ
1158   {SKIP_START_NONE,	SKIP_END_NONE,	  SKIP_NEXT_NONE, -2 }, // NE
1159   {SKIP_START_GE_ROW1,	SKIP_END_NONE,	  SKIP_NEXT_ROW,   2 }, // GE
1160   {SKIP_START_GT_ROW1,	SKIP_END_NONE,	  SKIP_NEXT_ROW,   1 }, // GT
1161   {SKIP_START_NONE,	SKIP_END_NONE,	  SKIP_NEXT_NONE, -2 }, // MATCH
1162   {SKIP_START_NONE,	SKIP_END_NONE,	  SKIP_NEXT_NONE, -2 }, // NOTMATCH
1163   {SKIP_START_GE_ROW1,	SKIP_END_GE_ROW2, SKIP_NEXT_MATCH, 3 }, // MATCH_CASE
1164   {SKIP_START_NONE,	SKIP_END_NONE,	  SKIP_NEXT_NONE, -2 }, // NOTMATCH_CASE
1165   {SKIP_START_GE_ROW1,	SKIP_END_GE_ROW2, SKIP_NEXT_ROW,   4 }, // RANGE
1166   {SKIP_START_RESET,	SKIP_END_NONE, SKIP_NEXT_IN_LIST,  6 }  // IN
1167 };
1168 
1169 #define SORT_SCORE 1 // being able to sort is worth 1 point
1170      // TODO: see if this should be higher
1171      //       if you change this, change the "not implemented"
1172      //       value in the table above to -1 - SORT_SCORE
1173 
1174 enum walkType_e { WALK_DEFAULT, WALK_SKIP, WALK_HASH_EQ, WALK_HASH_IN };
1175 
1176 static enum walkType_e hashTypes[] = {
1177   WALK_DEFAULT, WALK_DEFAULT, WALK_DEFAULT, WALK_DEFAULT, // FALSE..NOTNULL
1178   WALK_DEFAULT, WALK_DEFAULT, WALK_HASH_EQ, WALK_DEFAULT, // LT..NE
1179   WALK_DEFAULT, WALK_DEFAULT, WALK_DEFAULT, WALK_DEFAULT, // GT..NOTMATCH
1180   WALK_DEFAULT, WALK_DEFAULT, WALK_DEFAULT, WALK_HASH_IN  // MATCH_CASE..IN
1181 };
1182 
1183 #ifdef WITH_SHARED_TABLES
1184 struct restart_t {
1185     ctable_BaseRow	  *row1;
1186     ctable_BaseRow	  *row2;
1187     enum skipStart_e	   skipStart;
1188     enum skipEnd_e	   skipEnd;
1189     fieldCompareFunction_t compareFunction;
1190 };
1191 
ctable_SearchRestartNeeded(ctable_BaseRow * row,struct restart_t * restart)1192 CTABLE_INTERNAL int ctable_SearchRestartNeeded(ctable_BaseRow *row, struct restart_t *restart)
1193 {
1194     // check to see if we're still following the right skiplist, by seeing
1195     // if the value we're following still satisfies the criteria
1196     switch(restart->skipEnd) {
1197 	case SKIP_END_GE_ROW1: {
1198 	    if (restart->compareFunction (row, restart->row1) >= 0)
1199 		return 1;
1200 	    break;
1201 	}
1202 	case SKIP_END_GT_ROW1: {
1203 	    if (restart->compareFunction (row, restart->row1) > 0)
1204 		return 1;
1205 	    break;
1206 	}
1207 	case SKIP_END_GE_ROW2: {
1208 	    if (restart->compareFunction (row, restart->row2) >= 0)
1209 		return 1;
1210 	    break;
1211 	}
1212 	default: {
1213 	    break;
1214 	}
1215     }
1216     switch(restart->skipStart) {
1217         case SKIP_START_GE_ROW1: {
1218 	    if (restart->compareFunction (row, restart->row1) < 0)
1219 		return 1;
1220 	    break;
1221 	}
1222 	case SKIP_START_GT_ROW1: {
1223 	    if (restart->compareFunction (row, restart->row1) <= 0)
1224 		return 1;
1225 	    break;
1226 	}
1227 	case SKIP_START_EQ_ROW1: {
1228 	    if (restart->compareFunction (row, restart->row1) != 0)
1229 		return 1;
1230 	    break;
1231 	}
1232         case SKIP_START_NONE:
1233 	case SKIP_START_RESET: {
1234 	    break;
1235 	}
1236     }
1237     // Got through here... no restart needed
1238     return 0;
1239 }
1240 #endif
1241 
1242 //
1243 // ctable_PrepareTransactions - set up buffering for transactions if needed
1244 //
ctable_PrepareTransactions(CTable * ctable,CTableSearch * search)1245 CTABLE_INTERNAL void ctable_PrepareTransactions(CTable *ctable, CTableSearch *search)
1246 {
1247     // buffer results if:
1248     //   We've got search actions, AND...
1249     //     We're searching, or...
1250     //     We're a reader table, or...
1251     //     We explicitly requested bufering.
1252     if (search->action == CTABLE_SEARCH_ACTION_NONE) {
1253 	search->bufferResults = CTABLE_BUFFER_NONE;
1254     } else if(search->action == CTABLE_SEARCH_ACTION_CURSOR) {
1255 	search->bufferResults = CTABLE_BUFFER_DEFER;
1256     } else if(search->sortControl.nFields > 0) {
1257 	search->bufferResults = CTABLE_BUFFER_DEFER;
1258 #ifdef WITH_SHARED_TABLES
1259     } else if(ctable->share_type == CTABLE_SHARED_READER) {
1260 	search->bufferResults = CTABLE_BUFFER_DEFER;
1261 #endif
1262     } else if(search->bufferResults < 0) {
1263     	search->bufferResults = CTABLE_BUFFER_NONE;
1264     }
1265 
1266     // If we're not buffering for any other reason, buffer for a transaction
1267     // but set bufferResults to PROVISIONAL so we can still terminate the
1268     // search early
1269     if(search->bufferResults == CTABLE_BUFFER_NONE && search->tranType != CTABLE_SEARCH_TRAN_NONE) {
1270 	search->bufferResults = CTABLE_BUFFER_PROVISIONAL;
1271     }
1272 
1273     // if we're buffering,
1274     // allocate a space for the search results that we'll then sort from
1275     if (search->bufferResults != CTABLE_BUFFER_NONE) {
1276 	search->tranTable = (ctable_BaseRow **)ckalloc (sizeof (ctable_BaseRow *) * ctable->count);
1277     }
1278 }
1279 
1280 //
1281 // ctable_PerformSearch - perform the search
1282 //
1283 // write field names if we need to
1284 //
1285 // for each row in the table, apply search compare test in turn
1286 //    the first one that comes up that the row should be excluded ends
1287 //    looking at that one
1288 //
1289 //    if nothing excluded it, we pick it -- this can mean taking the action
1290 //    immediately or, if sorting, picking the object for sorting and then
1291 //    taking the action
1292 //
1293 //
1294 CTABLE_INTERNAL int
ctable_PerformSearch(Tcl_Interp * interp,CTable * ctable,CTableSearch * search)1295 ctable_PerformSearch (Tcl_Interp *interp, CTable *ctable, CTableSearch *search) {
1296     int           	  compareResult;
1297     int			  finalResult = TCL_OK;
1298 
1299     ctable_CreatorTable	 *creator = ctable->creator;
1300 
1301     ctable_BaseRow	  *row = NULL;
1302     ctable_BaseRow	  *row1 = NULL;
1303     ctable_BaseRow	  *walkRow;
1304     ctable_BaseRow        *row2 = NULL;
1305     char	  	  *key = NULL;
1306 
1307     int			   bestScore = 0;
1308 
1309     jsw_skip_t   	  *skipList = NULL;
1310     int           	   skipField = 0;
1311     int			   inOrderWalk = 0;
1312 
1313     fieldCompareFunction_t compareFunction = NULL;
1314     int                    indexNumber = -1;
1315     int                    comparisonType = 0;
1316 
1317     int			   sortField = -1;
1318 
1319     enum walkType_e	   walkType	= WALK_DEFAULT;
1320 
1321     enum skipStart_e	   skipStart = SKIP_START_NONE;
1322     enum skipEnd_e	   skipEnd = SKIP_END_NONE;
1323     enum skipNext_e	   skipNext = SKIP_NEXT_NONE;
1324 
1325     int			   myCount;
1326 
1327     int			   inIndex = 0;
1328     Tcl_Obj		 **inListObj = NULL;
1329     ctable_BaseRow	 **inListRows = NULL;
1330     int			   inCount = 0;
1331 
1332     int			   canUseHash = 1;
1333 
1334     CTableSearch         *s;
1335 
1336 #ifdef WITH_SHARED_TABLES
1337     int			   firstTime = 1;
1338     int			   locked_cycle = LOST_HORIZON;
1339     int			   num_restarts = 0;
1340 
1341     jsw_skip_t   	  *skipListCopy = NULL;
1342 #endif
1343 
1344     if (search->writingTabsepIncludeFieldNames) {
1345 	ctable_WriteFieldNames (interp, ctable, search);
1346     }
1347 
1348 #ifdef WITH_SHARED_TABLES
1349     if (firstTime) {
1350 	firstTime = 0;
1351     } else {
1352 restart_search:
1353 	num_restarts++;
1354 
1355 	// Check for indefinite deferral
1356 	if(MAX_RESTARTS > 0 && num_restarts > MAX_RESTARTS) {
1357 	    Tcl_AppendResult (interp, "restart count exceeded", (char *) NULL);
1358 	    finalResult = TCL_ERROR;
1359 	    goto clean_and_return;
1360 	}
1361 
1362 	// re-initialise and de-allocate and clean up, we're going through the
1363 	// while exercise again...
1364 
1365         if (search->tranTable != NULL) {
1366 	    ckfree ((char *)search->tranTable);
1367 	    search->tranTable = NULL;
1368         }
1369 
1370         if(skipListCopy) {
1371 	    jsw_free_private_copy(skipListCopy);
1372 	    skipListCopy = NULL;
1373 	}
1374 
1375 
1376         if (ctable->share_type == CTABLE_SHARED_READER) {
1377 	    read_unlock(ctable->share);
1378 	    locked_cycle = LOST_HORIZON;
1379 	}
1380 
1381 	row = NULL;
1382 	row1 = NULL;
1383 	row2 = NULL;
1384 	key = NULL;
1385 	bestScore = 0;
1386 
1387 	skipList = NULL;
1388         skipField = 0;
1389         inOrderWalk = 0;
1390 
1391         compareFunction = NULL;
1392         indexNumber = -1;
1393         comparisonType = 0;
1394 
1395         sortField = -1;
1396 
1397         walkType = WALK_DEFAULT;
1398 
1399         skipStart = SKIP_START_NONE;
1400         skipEnd = SKIP_END_NONE;
1401         skipNext = SKIP_NEXT_NONE;
1402 
1403         inIndex = 0;
1404         inListObj = NULL;
1405         inCount = 0;
1406     }
1407 
1408     if (ctable->share_type == CTABLE_SHARED_READER) {
1409 	// make sure the dummy ctable is up to date.
1410 	locked_cycle = read_lock(ctable->share);
1411 
1412         ctable->count = ctable->share_ctable->count;
1413 	ctable->skipLists = ctable->share_ctable->skipLists;
1414 	ctable->ll_head = ctable->share_ctable->ll_head;
1415     }
1416 #endif
1417 
1418     search->matchCount = 0;
1419     search->alreadySearched = -1;
1420     if (search->tranTable != NULL) {
1421 	ckfree ((char *)search->tranTable);
1422 	search->tranTable = NULL;
1423     }
1424     search->offsetLimit = search->offset + search->limit;
1425 
1426     if (ctable->count == 0) {
1427 #ifdef WITH_SHARED_TABLES
1428         if(locked_cycle != LOST_HORIZON)
1429 	    read_unlock(ctable->share);
1430 #endif
1431 	if (search->cursor) {
1432 	    ctable_CreateCursorCommand(interp, search->cursor);
1433 	    Tcl_SetObjResult (interp, ctable_CursorToName (search->cursor));
1434 	} else if (search->cursorName) {
1435 	    struct cursor *cursor = ctable_CreateEmptyCursor(interp, ctable, search->cursorName);
1436 	    search->cursorName = NULL;
1437 	    ctable_CreateCursorCommand(interp, cursor);
1438 	    Tcl_SetObjResult (interp, ctable_CursorToName (cursor));
1439 	} else {
1440 	    Tcl_SetObjResult (interp, Tcl_NewIntObj (0));
1441 	}
1442         return TCL_OK;
1443     }
1444 
1445     // Check to see if we're sorting on a single field
1446     if (search->sortControl.nFields == 1) {
1447 	sortField = search->sortControl.fields[0];
1448     }
1449 
1450     // If there's no components in the search, make sure the index is NONE
1451     if(!search->nComponents) {
1452 	search->reqIndexField = CTABLE_SEARCH_INDEX_NONE;
1453     }
1454 
1455     // Check if we can use the hash table.
1456 #ifdef WITH_SHARED_TABLES
1457     // fprintf(stderr, "ctable->share_type=%d\n", ctable->share_type);
1458     if(ctable->share_type == CTABLE_SHARED_READER) {
1459 	// fprintf(stderr, "READER TABLE\n");
1460 	canUseHash = 0;
1461     }
1462 #endif
1463 
1464     // if they're asking for an index search, look for the best search
1465     // in the list of comparisons
1466     //
1467     // See the skipTypes table for the scores, but basically the tighter
1468     // the constraint, the better.
1469     //
1470     // If we can use a hash table, we use it, because it's either JUST
1471     // a simple hash lookup, or it's "in" which has to be handled here.
1472     //
1473     // If we're doing a client search, we don't have access to the
1474     // hashtable, so we can't do a hash search.
1475     if (search->reqIndexField != CTABLE_SEARCH_INDEX_NONE && search->nComponents > 0) {
1476 	int index = 0;
1477 	int trynum;
1478 
1479 	if(search->reqIndexField != CTABLE_SEARCH_INDEX_ANY) {
1480 	    while(index < search->nComponents) {
1481 	        if(search->components[index].fieldID == search->reqIndexField)
1482 		    break;
1483 		index++;
1484 	    }
1485 	}
1486 
1487 	// Look for the best usable search field starting with the requested
1488 	// one
1489 	for(trynum = 0; trynum < search->nComponents; trynum++, index++) {
1490 	    if(index >= search->nComponents)
1491 	        index = 0;
1492 	    CTableSearchComponent *component = &search->components[index];
1493 	    int field = component->fieldID;
1494 	    int score;
1495 
1496 	    comparisonType = component->comparisonType;
1497 
1498 	    // If it's the key, then see if it's something we can walk
1499 	    // using a hash.
1500 	    if(field == creator->keyField && canUseHash) {
1501 		walkType_e tryWalkType = hashTypes[comparisonType];
1502 
1503 		if (tryWalkType != WALK_DEFAULT) {
1504 		    walkType = tryWalkType;
1505 
1506 		    if(walkType == WALK_HASH_IN) {
1507 			inOrderWalk = 0; // TODO: check if list in order
1508 		        inListObj = component->inListObj;
1509 		        inCount = component->inCount;
1510 		    } else { //    WALK_HASH_EQ
1511 			inOrderWalk = 1; // degenerate case, only one result.
1512 			row1 = (ctable_BaseRow*) component->row1;
1513 			key = row1->hashEntry.key;
1514 		    }
1515 
1516 		    search->alreadySearched = index;
1517 
1518 		    // Always use a hash if it's available, because it's
1519 		    // either '=' (with only one result) or 'in' (with at
1520 		    // most inCount results).
1521 		    break;
1522 		}
1523 	    }
1524 
1525 	    // Do we have an index on this puppy?
1526 	    if(!ctable->skipLists[field]) {
1527 		continue;
1528 	    }
1529 
1530 	    // Special case - if it's a match and not anchored, skip
1531 	    if(skipTypes[comparisonType].skipNext == SKIP_NEXT_MATCH) {
1532 		if(component->row2 == NULL) {
1533 		    continue;
1534 		}
1535 	    }
1536 
1537 	    // If we have previous searches, walk back through the previous searches to see if we're already using
1538 	    // this field
1539 	    for(s = search->previousSearch; s; s = s->previousSearch) {
1540 		if(s->searchField == field) {
1541 		    break;
1542 		}
1543 	    }
1544 	    if(s) {
1545 		continue;
1546 	    }
1547 
1548 	    score = skipTypes[comparisonType].score;
1549 
1550 	    // Prefer to avoid sort
1551 	    if(field == sortField) score += SORT_SCORE;
1552 
1553 	    // We already found a better option than this one, skip it
1554 	    if (bestScore > score)
1555 		continue;
1556 
1557 	    // Got a new best candidate, save the world.
1558 	    skipField = field;
1559 	    search->searchField = field;
1560 
1561 	    skipNext  = skipTypes[comparisonType].skipNext;
1562 	    skipStart = skipTypes[comparisonType].skipStart;
1563 	    skipEnd   = skipTypes[comparisonType].skipEnd;
1564 
1565 	    search->alreadySearched = index;
1566 	    skipList = ctable->skipLists[field];
1567 	    walkType = WALK_SKIP;
1568 
1569             compareFunction = creator->fields[field]->compareFunction;
1570             indexNumber = creator->fields[field]->indexNumber;
1571 
1572 	    switch(skipNext) {
1573 	        case SKIP_NEXT_MATCH: {
1574 		    // For a match, we use row2 and row3 as row1 and row2,
1575 		    // otherwise treat it as a range
1576 		    inOrderWalk = 1;
1577 		    row1 = (ctable_BaseRow*)component->row2;
1578 		    row2 = component->row3;
1579 		    skipNext = SKIP_NEXT_ROW;
1580 		    break;
1581 	        }
1582 		case SKIP_NEXT_ROW: {
1583 		    inOrderWalk = 1;
1584 		    row1 = (ctable_BaseRow*) component->row1;
1585 		    row2 = component->row2;
1586 		    break;
1587 		}
1588 		case SKIP_NEXT_IN_LIST: {
1589 		    inOrderWalk = 0; // TODO: check if list in order
1590 		    inListObj = component->inListObj;
1591 		    inCount = component->inCount;
1592 		    if (ctable_CreateInRows(interp, ctable, component) == TCL_ERROR) {
1593 			finalResult = TCL_ERROR;
1594 			goto clean_and_return;
1595 		    }
1596 		    inListRows = component->inListRows;
1597 		    row1 = (ctable_BaseRow*) component->row1;
1598 		    break;
1599 		}
1600 		default: { // Can't happen
1601 		    Tcl_Panic("skipNext has unexpected value %d (comparisonType == %d, skipStart == %d, skipEnd == %d, score == %d)", skipNext, comparisonType, skipStart, skipEnd, score);
1602 		}
1603 	    }
1604 
1605 	    // Score of 100 means it's mandatory
1606 	    if (bestScore >= 100)
1607 	        break;
1608         }
1609     }
1610 
1611 
1612     // if we're sorting on the field we're searching, AND we can eliminate
1613     // the sort because we know we're walking in order, then eliminate the
1614     // sort step
1615     if (inOrderWalk) {
1616         if (search->sortControl.nFields == 1) {
1617 	    if(sortField == skipField) {
1618 		search->sortControl.nFields = 0;
1619 	    }
1620 	}
1621     }
1622 
1623     // Prepare transaction buffering if necessary
1624     ctable_PrepareTransactions(ctable, search);
1625 
1626     // Prepare for background operations
1627     if(search->pollInterval) search->nextPoll = search->pollInterval;
1628 
1629 #ifdef INDEXDEBUG
1630 fprintf(stderr, "Starting search.\n");
1631 fprintf(stderr, "  walkType == %d\n", walkType);
1632 fprintf(stderr, "  sortField == %d\n", sortField);
1633 fprintf(stderr, "  skipField == %d\n", skipField);
1634 fprintf(stderr, "  skipList == 0x%0ld\n", (long)skipList);
1635 if(skipList) {
1636 fprintf(stderr, "    skipNext == %d\n", skipNext);
1637 fprintf(stderr, "    skipStart == %d\n", skipStart);
1638 fprintf(stderr, "    skipEnd == %d\n", skipEnd);
1639 }
1640 fprintf(stderr, "  search->sortControl.nFields == %d\n", search->sortControl.nFields);
1641 int ii;
1642 fprintf(stderr, "  search->nComponents == %d\n", search->nComponents);
1643 for(ii = 0; ii < search->nComponents; ii++) {
1644 CTableSearchComponent *component = &search->components[ii];
1645 fprintf(stderr, "    search->components[%d].fieldID == %d\n", ii, component->fieldID);
1646 fprintf(stderr, "    search->components[%d].comparisonType == %d\n", ii, component->comparisonType);
1647 }
1648 #endif
1649 
1650     if (walkType == WALK_DEFAULT) {
1651 #ifdef INDEXDEBUG
1652 fprintf(stderr, "WALK_DEFAULT\n");
1653 #endif
1654 	// walk the hash table links.
1655 	CTABLE_LIST_FOREACH (ctable->ll_head, row, 0) {
1656 	    compareResult = ctable_SearchCompareRow (interp, ctable, search, row);
1657 	    if ((compareResult == TCL_CONTINUE) || (compareResult == TCL_OK))
1658 		continue;
1659 
1660 	    if (compareResult == TCL_BREAK)
1661 		break;
1662 
1663 	    if (compareResult == TCL_RETURN) {
1664 		finalResult = TCL_RETURN;
1665 		break;
1666 	    }
1667 
1668 	    if (compareResult == TCL_ERROR) {
1669 		finalResult = TCL_ERROR;
1670 		goto clean_and_return;
1671 	    }
1672         }
1673     } else if(walkType == WALK_HASH_EQ || walkType == WALK_HASH_IN) {
1674 #ifdef INDEXDEBUG
1675 fprintf(stderr, "WALK_HASH_*\n");
1676 #endif
1677 	// Just look up the necessary hash table values
1678 
1679 	// This loop is a little complex because for "=" the key is
1680 	// already set, otherwise it needs to be loaded from the index
1681 	// list. This would actually be simpler with a goto. :)
1682 	while (inIndex < inCount || key) {
1683 	    // If we don't have a key, get one.
1684 	    if (!key)
1685 		key = Tcl_GetString(inListObj[inIndex++]);
1686 
1687 	    // Look it up
1688 	    row2 = creator->find_row(ctable, key);
1689 
1690 	    // Throw away this key
1691 	    key = NULL;
1692 
1693 	    // If we didn't find the prize, try again
1694 	    if(!row2)
1695 		continue;
1696 
1697 	    row1 = (ctable_BaseRow *)row2;
1698 
1699 	    compareResult = ctable_SearchCompareRow (interp, ctable, search, row1);
1700 	    if ((compareResult == TCL_CONTINUE) || (compareResult == TCL_OK))
1701 		continue;
1702 
1703 	    if (compareResult == TCL_BREAK)
1704 		break;
1705 
1706 	    if (compareResult == TCL_RETURN) {
1707 		finalResult = TCL_RETURN;
1708 		break;
1709 	    }
1710 	    if (compareResult == TCL_ERROR) {
1711 		finalResult = TCL_ERROR;
1712 		goto clean_and_return;
1713 	    }
1714         }
1715     } else {
1716 #ifdef INDEXDEBUG
1717 fprintf(stderr, "WALK_SKIP\n");
1718 #endif
1719 
1720         //
1721         // walk the skip list
1722         //
1723         // all walks are "tailored", meaning the first search term is of an
1724         // indexed row and it's of a type where we can cut off the search
1725         // past a point, see if we're past the cutoff point and if we are
1726         // terminate the search.
1727         //
1728 #ifdef WITH_SHARED_TABLES
1729 	struct restart_t main_restart;
1730 	struct restart_t loop_restart;
1731 	cell_t main_cycle = LOST_HORIZON;
1732 	cell_t loop_cycle = LOST_HORIZON;
1733 
1734 #ifdef SANITY_CHECKS
1735 	creator->sanity_check_pointer(ctable, (void *)skipList, CTABLE_INDEX_NORMAL, "ctablePerformSearch : skipList");
1736 #endif
1737 
1738 	if(ctable->share_type == CTABLE_SHARED_READER) {
1739 	    // Save the cycle at the time we started the search
1740 	    main_cycle = ctable->share->map->cycle;
1741 #ifdef SANITY_CHECKS
1742 	    if(main_cycle == LOST_HORIZON)
1743 		Tcl_Panic("Master is not updating the garbage collect cycle!");
1744 #endif
1745 	    // save the main restart condition
1746 	    main_restart.row1 = row1;
1747 	    main_restart.row2 = (ctable_BaseRow*)row2;
1748 	    main_restart.skipStart = skipStart;
1749 	    main_restart.skipEnd = skipEnd;
1750 	    main_restart.compareFunction = compareFunction;
1751 
1752 	    // clone the skiplist
1753 	    skipListCopy = jsw_private_copy(skipList, getpid(), compareFunction);
1754 	    if(skipListCopy)
1755 		skipList = skipListCopy;
1756 	}
1757 #endif
1758 
1759 	// Find the row to start walking on
1760 	switch(skipStart) {
1761 
1762 	    case SKIP_START_EQ_ROW1: {
1763 if(!row1) Tcl_Panic("Can't happen! Row1 is null for '=' comparison.");
1764 		jsw_sfind (skipList, row1);
1765 		break;
1766 	    }
1767 
1768 	    case SKIP_START_GE_ROW1: {
1769 if(!row1) Tcl_Panic("Can't happen! Row1 is null for '>=' comparison.");
1770 		jsw_sfind_equal_or_greater (skipList, row1);
1771 		break;
1772 	    }
1773 
1774 	    case SKIP_START_GT_ROW1: {
1775 if(!row1) Tcl_Panic("Can't happen! Row1 is null for '>' comparison.");
1776 		jsw_sfind_equal_or_greater (skipList, row1);
1777 		while (1) {
1778                     if ((row = jsw_srow (skipList)) == NULL)
1779 		        goto search_complete;
1780 	    	    if (compareFunction (row, row1) > 0)
1781 			break;
1782 		    jsw_snext(skipList);
1783 		}
1784 		break;
1785 	    }
1786 
1787 	    case SKIP_START_RESET: {
1788 		jsw_sreset(skipList);
1789 		break;
1790 	    }
1791 
1792 	    default: { // can't happen
1793 		Tcl_Panic("skipStart has unexpected value %d", skipStart);
1794 	    }
1795 	}
1796 
1797 #ifdef MEGADEBUG
1798 #ifdef WITH_SHARED_TABLES
1799 if(ctable->share_type == CTABLE_SHARED_READER)
1800     fprintf(stderr, "Following skiplist 0x%lX\n", (long)skipList);
1801 #endif
1802 #endif
1803 
1804 	// Count is only to make sure we blow up on infinite loops
1805 	// It's compared less than OR equal to the largest possible value
1806 	// it could have.
1807 	for(myCount = 0; myCount <= ctable->count; myCount++) {
1808 	    if(skipNext == SKIP_NEXT_IN_LIST) {
1809 		// We're looking at a list of entries rather than a range,
1810 		// so we have to loop here searching for each in turn instead
1811 		// of just following the skiplist
1812 		while(1) {
1813 		  // If we're at the end, game over.
1814 		  if(inIndex >= inCount)
1815 		      goto search_complete;
1816 
1817 #ifdef WITH_SHARED_TABLES
1818 		  if(main_cycle != LOST_HORIZON) {
1819 	            // Save the cycle at the time we started the walk
1820 	            main_cycle = ctable->share->map->cycle;
1821 		  }
1822 #endif
1823 
1824 		  row = inListRows[inIndex++];
1825 if(!row) Tcl_Panic("Can't happen, null row in 'in' comparison");
1826 
1827 		  // If there's a match for this row, break out of the loop
1828                   if (jsw_sfind (skipList, row) != NULL)
1829 		      break;
1830 	        }
1831 	    }
1832 
1833 	    // Now we can fetch whatever we found.
1834             if ((row = jsw_srow (skipList)) == NULL)
1835 		goto search_complete;
1836 #ifdef SANITY_CHECKS
1837 	creator->sanity_check_pointer(ctable, (void *)row, CTABLE_INDEX_NORMAL, "ctablePerformSearch : row");
1838 #endif
1839 
1840 #ifdef WITH_SHARED_TABLES
1841 	    // If we're a reader and this row has changed since we started
1842 	    // then check if it changed the skiplist we're following, if so...
1843 	    // go back and restart the search
1844 	    if(main_cycle != LOST_HORIZON) {
1845 	        // Save the cycle at the time we started the loop
1846 	        loop_cycle = ctable->share->map->cycle;
1847 		if(row->_row_cycle != LOST_HORIZON) {
1848 		    int delta = row->_row_cycle - main_cycle;
1849 		    if(delta > 0) {
1850 		        if(ctable_SearchRestartNeeded(row, &main_restart)) {
1851 #ifdef MEGADEBUG
1852 if(num_restarts == 0) fprintf(stderr, "%d: main restart: main_cycle=%ld; row->_row_cycle=%ld; delta %d\n", getpid(), (long)main_cycle, (long)row->_row_cycle, delta);
1853 #endif
1854 			    goto restart_search;
1855 			}
1856 		    }
1857 #ifdef SANITY_CHECKS
1858 		} else {
1859 		    Tcl_Panic("Master is not copying the garbage collect cycle to the row!");
1860 #endif
1861 		}
1862 
1863 	        // save the loop restart condition - exact match for this field
1864 	        loop_restart.row1 = row;
1865 	        loop_restart.row2 = row;
1866 	        loop_restart.skipStart = SKIP_START_EQ_ROW1;
1867 	        loop_restart.skipEnd = SKIP_END_NE_ROW1;
1868 	        loop_restart.compareFunction = compareFunction;
1869 	    }
1870 #endif
1871 
1872 	    // if at end or past any terminating condition, break
1873 	    switch(skipEnd) {
1874 	        case SKIP_END_GE_ROW1: {
1875 		    if (compareFunction (row, row1) >= 0)
1876 			goto search_complete;
1877 		    break;
1878 		}
1879 	        case SKIP_END_GT_ROW1: {
1880 		    if (compareFunction (row, row1) > 0)
1881 			goto search_complete;
1882 		    break;
1883 		}
1884 	        case SKIP_END_GE_ROW2: {
1885 		    if (compareFunction (row, row2) >= 0)
1886 			goto search_complete;
1887 		    break;
1888 		}
1889 		default: {
1890 		    // may not be a terminating condition, or it may
1891 		    // have been taken care of above
1892 	        }
1893 	    }
1894 
1895             // walk walkRow through the linked list of rows off this skip list
1896 	    // node. This is not the safe foreach routine because we don't
1897 	    // change the skiplist during the search, and if someone else
1898 	    // does it we need to restart the transaction anyway.
1899 
1900             CTABLE_LIST_FOREACH (row, walkRow, indexNumber) {
1901 
1902 #ifdef WITH_SHARED_TABLES
1903 		// If we're a reader and this row has changed since we started
1904 		// then check if it changed the list we're following, if so...
1905 		// go back and restart the search
1906 	        if(loop_cycle != LOST_HORIZON) {
1907 		    if(row->_row_cycle != LOST_HORIZON) {
1908 		        int delta = row->_row_cycle - main_cycle;
1909 		        if(delta > 0) {
1910 		            if(ctable_SearchRestartNeeded(row, &loop_restart)) {
1911 #ifdef MEGADEBUG
1912 if(num_restarts == 0) fprintf(stderr, "%d: loop restart: loop_cycle=%ld; row->_row_cycle=%ld; delta=%d\n", getpid(), (long)loop_cycle, (long)row->_row_cycle, delta);
1913 #endif
1914 			        goto restart_search;
1915 			    }
1916 		        }
1917 #ifdef SANITY_CHECKS
1918 		    } else {
1919 		        Tcl_Panic("Master is not copying the garbage collect cycle to the row!");
1920 #endif
1921 		    }
1922 	        }
1923 #endif
1924 
1925 	        compareResult = ctable_SearchCompareRow (interp, ctable, search, walkRow);
1926 	        if ((compareResult == TCL_CONTINUE) || (compareResult == TCL_OK)) continue;
1927 
1928 	        if (compareResult == TCL_BREAK)
1929 		    goto search_complete;
1930 
1931 		if (compareResult == TCL_RETURN) {
1932 		    finalResult = TCL_RETURN;
1933 	            goto search_complete;
1934 	        }
1935 
1936 	        if (compareResult == TCL_ERROR) {
1937 	            finalResult = TCL_ERROR;
1938 	            goto clean_and_return;
1939 	        }
1940 	    }
1941 
1942 	    if(skipNext == SKIP_NEXT_ROW)
1943 		jsw_snext(skipList);
1944 	}
1945 	// Should never just fall out of the loop...
1946 	Tcl_AppendResult (interp, "infinite search loop", (char *) NULL);
1947 	finalResult = TCL_ERROR;
1948 	goto clean_and_return;
1949     }
1950 
1951   // We only jump to this on success, so we got to the end of the loop
1952   // or we broke out of it early
1953   search_complete:
1954 
1955     // We're no longer walking a skiplist, so make a note of that so it can be re-used.
1956     search->searchField = -1;
1957 
1958     switch (ctable_PostSearchCommonActions (interp, ctable, search)) {
1959 	case TCL_ERROR: {
1960 	    finalResult = TCL_ERROR;
1961 	    break;
1962 	}
1963 	case TCL_RETURN: {
1964 	    finalResult = TCL_RETURN;
1965 	    break;
1966 	}
1967     }
1968 
1969   // We only jump to this on an error
1970   clean_and_return:
1971     if (search->tranTable != NULL) {
1972 	ckfree ((char *)search->tranTable);
1973 	search->tranTable = NULL;
1974     }
1975 
1976     if (finalResult != TCL_ERROR && (search->codeBody == NULL || finalResult != TCL_RETURN)) {
1977 	if(search->cursor) {
1978 	    // We got here so we can create the command
1979 	    ctable_CreateCursorCommand(interp, search->cursor);
1980 	    Tcl_SetObjResult (interp, ctable_CursorToName (search->cursor));
1981 	} else if (search->cursorName) {
1982 	    struct cursor *cursor = ctable_CreateEmptyCursor(interp, ctable, search->cursorName);
1983 	    search->cursorName = NULL;
1984 	    ctable_CreateCursorCommand(interp, cursor);
1985 	    Tcl_SetObjResult (interp, ctable_CursorToName (cursor));
1986 	} else {
1987 	    Tcl_SetObjResult (interp, Tcl_NewIntObj (search->matchCount));
1988 	}
1989     }
1990 
1991 #ifdef WITH_SHARED_TABLES
1992     if(search->cursor)
1993 	search->cursor->lockCycle = locked_cycle;
1994     else if(locked_cycle != LOST_HORIZON)
1995 	read_unlock(ctable->share);
1996 
1997     if(skipListCopy)
1998 	jsw_free_private_copy(skipListCopy);
1999 
2000 #ifdef MEGADEBUG
2001 if(num_restarts) fprintf(stderr, "%d: Restarted search %d times\n", getpid(), num_restarts);
2002 #endif
2003 #endif
2004 
2005     return finalResult;
2006 }
2007 
2008 //
2009 // ctable_SetupSearch - prepare to search by parsing the command line arguments
2010 // specified when the ctables "search" method is invoked.
2011 //
2012 //
2013 static int
ctable_SetupSearch(Tcl_Interp * interp,CTable * ctable,Tcl_Obj * CONST objv[],int objc,CTableSearch * search,int indexField,CTableSearch * previous_search)2014 ctable_SetupSearch (Tcl_Interp *interp, CTable *ctable, Tcl_Obj *CONST objv[], int objc, CTableSearch *search, int indexField, CTableSearch *previous_search) {
2015     int             i;
2016     int             searchTerm = 0;
2017     CONST char    **fieldNames = ctable->creator->fieldNames;
2018     int		    quick_count;
2019 
2020     static int staticSequence = 0;
2021 
2022     static CONST char *searchOptions[] = {"-array", "-array_with_nulls", "-array_get", "-array_get_with_nulls", "-code", "-compare", "-countOnly", "-fields", "-get", "-glob", "-key", "-with_field_names", "-limit", "-nokeys", "-offset", "-sort", "-write_tabsep", "-tab", "-delete", "-update", "-buffer", "-index", "-poll_code", "-poll_interval", "-quote", "-null", "-filter", "-cursor", (char *)NULL};
2023 
2024     enum searchOptions {SEARCH_OPT_ARRAY_NAMEOBJ, SEARCH_OPT_ARRAYWITHNULLS_NAMEOBJ, SEARCH_OPT_ARRAYGET_NAMEOBJ, SEARCH_OPT_ARRAYGETWITHNULLS_NAMEOBJ, SEARCH_OPT_CODE, SEARCH_OPT_COMPARE, SEARCH_OPT_COUNTONLY, SEARCH_OPT_FIELDS, SEARCH_OPT_GET_NAMEOBJ, SEARCH_OPT_GLOB, SEARCH_OPT_KEYVAR_NAMEOBJ, SEARCH_OPT_WITH_FIELD_NAMES, SEARCH_OPT_LIMIT, SEARCH_OPT_DONT_INCLUDE_KEY, SEARCH_OPT_OFFSET, SEARCH_OPT_SORT, SEARCH_OPT_WRITE_TABSEP, SEARCH_OPT_TAB, SEARCH_OPT_DELETE, SEARCH_OPT_UPDATE, SEARCH_OPT_BUFFER, SEARCH_OPT_INDEX, SEARCH_OPT_POLL_CODE, SEARCH_OPT_POLL_INTERVAL, SEARCH_OPT_QUOTE_TYPE, SEARCH_OPT_NULL_STRING, SEARCH_OPT_FILTER, SEARCH_OPT_CURSOR};
2025     if (objc < 2) {
2026       wrong_args:
2027 	Tcl_WrongNumArgs (interp, 2, objv, "?-array_get varName? ?-array_get_with_nulls varName? ?-code codeBody? ?-compare list? ?-filter list? ?-countOnly 0|1? ?-fields fieldList? ?-get varName? ?-glob pattern? ?-key varName? ?-with_field_names 0|1?  ?-limit limit? ?-nokeys 0|1? ?-offset offset? ?-sort {?-?field1..}? ?-write_tabsep channel? ?-tab value? ?-delete 0|1? ?-update {fields value...}? ?-buffer 0|1? ?-poll_interval interval? ?-poll_code codeBody? ?-quote type?");
2028 	return TCL_ERROR;
2029     }
2030 
2031     // Quick count of table rows for early optimizations where ONLY the
2032     // number of rows is being used. This quick_count is a snapshot that
2033     // is safe to use for shared readers because reading it is an atomic
2034     // operation.
2035 #ifdef WITH_SHARED_TABLES
2036     if (ctable->share_type == CTABLE_SHARED_READER)
2037 	quick_count = ctable->share_ctable->count;
2038     else
2039 #endif
2040 	quick_count = ctable->count;
2041 
2042 // TODO figure out how to handle the case where a cursor command is being used. In the meantime we can't use this shortcut
2043 #if 0
2044     // if there are no rows in the table, the search won't turn up
2045     // anything, so skip all that
2046     if (quick_count == 0)
2047     {
2048 	search->filters = NULL;
2049 	search->components = NULL; // keep ctable_searchTeardown happy
2050 	Tcl_SetObjResult (interp, Tcl_NewIntObj (0));
2051 	return TCL_RETURN;
2052     }
2053 #endif
2054 
2055     // initialize search control structure
2056     search->ctable = ctable;
2057     search->previousSearch = previous_search;
2058     search->action = CTABLE_SEARCH_ACTION_NONE;
2059     search->nComponents = 0;
2060     search->components = NULL;
2061     search->nFilters = 0;
2062     search->filters = NULL;
2063     search->countMax = 0;
2064     search->offset = 0;
2065     search->limit = 0;
2066     search->pattern = NULL;
2067     search->pollInterval = 0;
2068     search->nextPoll = -1;
2069     search->pollCodeBody = NULL;
2070     search->sortControl.fields = NULL;
2071     search->sortControl.directions = NULL;
2072     search->sortControl.nFields = 0;
2073     search->retrieveFields = NULL;
2074     search->nRetrieveFields = -1;   // -1 = all, 0 = none
2075     search->noKeys = 0;
2076     search->rowVarNameObj = NULL;
2077     search->keyVarNameObj = NULL;
2078     search->codeBody = NULL;
2079     search->writingTabsepIncludeFieldNames = 0;
2080     search->tranType = CTABLE_SEARCH_TRAN_NONE;
2081     search->tranData = NULL;
2082     search->reqIndexField = indexField;
2083     search->bufferResults = CTABLE_BUFFER_DEFAULT;
2084     search->sepstr = "\t";
2085     search->nullString = NULL;
2086     search->quoteType = CTABLE_QUOTE_NONE;
2087     search->matchCount = 0;
2088     search->alreadySearched = -1;
2089     search->tranTable = NULL;
2090     search->offsetLimit = search->offset + search->limit;
2091     search->cursorName = NULL;
2092     search->cursor = NULL;
2093     search->searchField = -1;
2094 
2095     // Give each search a unique non-zero sequence number
2096     if(++staticSequence == 0) ++staticSequence;
2097     search->sequence = staticSequence;
2098 
2099     for (i = 2; i < objc; ) {
2100 	if (Tcl_GetIndexFromObj (interp, objv[i++], searchOptions, "search option", TCL_EXACT, &searchTerm) != TCL_OK) {
2101 	    return TCL_ERROR;
2102 	}
2103 
2104 	//  all the arguments require one parameter
2105 	if (i >= objc) {
2106 	    goto wrong_args;
2107 	}
2108 
2109 	switch (searchTerm) {
2110 	  case SEARCH_OPT_SORT: {
2111 	    // the fields they want us to sort on
2112 	    if (ctable_ParseSortFieldList (interp, objv[i++], fieldNames, &search->sortControl) == TCL_ERROR) {
2113 	        Tcl_AppendResult (interp, " while processing sort options", (char *) NULL);
2114 	        return TCL_ERROR;
2115 	    }
2116 	    break;
2117 	  }
2118 
2119 	  case SEARCH_OPT_FIELDS: {
2120 	    // the fields they want us to retrieve
2121 	    if (ctable_ParseFieldList (interp, objv[i++], fieldNames, &search->retrieveFields, &search->nRetrieveFields) == TCL_ERROR) {
2122 	        Tcl_AppendResult (interp, " while processing search fields", (char *) NULL);
2123 	        return TCL_ERROR;
2124 	    }
2125 
2126 	    break;
2127 	  }
2128 
2129 	  case SEARCH_OPT_DONT_INCLUDE_KEY: {
2130 	    if (Tcl_GetBooleanFromObj (interp, objv[i++], &search->noKeys) == TCL_ERROR) {
2131 	        Tcl_AppendResult (interp, " while processing search -nokeys", (char *) NULL);
2132 	        return TCL_ERROR;
2133 	    }
2134 	    break;
2135 	  }
2136 
2137 	  case SEARCH_OPT_INDEX: {
2138 	    if (Tcl_GetIndexFromObj (interp, objv[i++], fieldNames, "field", TCL_EXACT, &search->reqIndexField) != TCL_OK) {
2139 		Tcl_AppendResult (interp, " while processing search index", (char *) NULL);
2140 		return TCL_ERROR;
2141 	    }
2142 	    break;
2143 	  }
2144 
2145 	  case SEARCH_OPT_WITH_FIELD_NAMES: {
2146 	    if (Tcl_GetBooleanFromObj (interp, objv[i++], &search->writingTabsepIncludeFieldNames) == TCL_ERROR) {
2147 	        Tcl_AppendResult (interp, " while processing search -with_field_names", (char *) NULL);
2148 	        return TCL_ERROR;
2149 	    }
2150 	    break;
2151 	  }
2152 
2153 	  case SEARCH_OPT_ARRAY_NAMEOBJ: {
2154 	    if (search->action != CTABLE_SEARCH_ACTION_NONE)
2155 		goto actionOverload;
2156 	    search->rowVarNameObj = objv[i++];
2157 	    search->action = CTABLE_SEARCH_ACTION_ARRAY;
2158 	    break;
2159 	  }
2160 
2161 	  case SEARCH_OPT_ARRAYWITHNULLS_NAMEOBJ: {
2162 	    if (search->action != CTABLE_SEARCH_ACTION_NONE)
2163 		goto actionOverload;
2164 	    search->rowVarNameObj = objv[i++];
2165 	    search->action = CTABLE_SEARCH_ACTION_ARRAY_WITH_NULLS;
2166 	    break;
2167 	  }
2168 
2169 	  case SEARCH_OPT_ARRAYGET_NAMEOBJ: {
2170 	    if (search->action != CTABLE_SEARCH_ACTION_NONE)
2171 		goto actionOverload;
2172 	    search->rowVarNameObj = objv[i++];
2173 	    search->action = CTABLE_SEARCH_ACTION_ARRAY_GET;
2174 	    break;
2175 	  }
2176 
2177 	  case SEARCH_OPT_ARRAYGETWITHNULLS_NAMEOBJ: {
2178 	    if (search->action != CTABLE_SEARCH_ACTION_NONE)
2179 		goto actionOverload;
2180 	    search->rowVarNameObj = objv[i++];
2181 	    search->action = CTABLE_SEARCH_ACTION_ARRAY_GET_WITH_NULLS;
2182 	    break;
2183 	  }
2184 
2185 	  case SEARCH_OPT_KEYVAR_NAMEOBJ: {
2186 	    search->keyVarNameObj = objv[i++];
2187 	    break;
2188           }
2189 
2190 	  case SEARCH_OPT_GET_NAMEOBJ: {
2191 	    if (search->action != CTABLE_SEARCH_ACTION_NONE)
2192 		goto actionOverload;
2193 	    search->rowVarNameObj = objv[i++];
2194 	    search->action = CTABLE_SEARCH_ACTION_GET;
2195 	    break;
2196           }
2197 
2198 	  case SEARCH_OPT_GLOB: {
2199 	    search->pattern = Tcl_GetString (objv[i++]);
2200 	    break;
2201 	  }
2202 
2203 	  case SEARCH_OPT_COMPARE: {
2204 	    if (ctable_ParseSearch (interp, ctable, objv[i++], fieldNames, search) == TCL_ERROR) {
2205 	        Tcl_AppendResult (interp, " while processing search compare", (char *) NULL);
2206 	        return TCL_ERROR;
2207 	    }
2208 	    break;
2209 	  }
2210 
2211 	  case SEARCH_OPT_FILTER: {
2212 	    if (ctable_ParseFilters (interp, ctable, objv[i++], search) == TCL_ERROR) {
2213 		Tcl_AppendResult (interp, " while processing search filter", (char *)NULL);
2214 		return TCL_ERROR;
2215 	    }
2216 	    break;
2217 	  }
2218 
2219 	  case SEARCH_OPT_COUNTONLY: {
2220 #if 0
2221 	    int countOnly;
2222 
2223 	    if (Tcl_GetBooleanFromObj (interp, objv[i++], &countOnly) == TCL_ERROR) {
2224 	        Tcl_AppendResult (interp, " while processing search countOnly", (char *) NULL);
2225 	        return TCL_ERROR;
2226 	    }
2227 
2228 	    if (countOnly) {
2229 		if (search->action != CTABLE_SEARCH_ACTION_NONE)
2230 		    goto actionOverload;
2231 		search->action = CTABLE_SEARCH_ACTION_COUNT;
2232 	    }
2233 #else
2234 	    i++; // skip argument
2235 #endif
2236 	    break;
2237 	  }
2238 
2239 	  case SEARCH_OPT_OFFSET: {
2240 	    if (Tcl_GetIntFromObj (interp, objv[i++], &search->offset) == TCL_ERROR) {
2241 	        Tcl_AppendResult (interp, " while processing search offset", (char *) NULL);
2242 	        return TCL_ERROR;
2243 	    }
2244 
2245 	    if (search->offset < 0) {
2246 	        Tcl_AppendResult (interp, "Search offset cannot be negative", (char *) NULL);
2247 	        return TCL_ERROR;
2248 	    }
2249 	    break;
2250 	  }
2251 
2252 	  case SEARCH_OPT_LIMIT: {
2253 	    if (Tcl_GetIntFromObj (interp, objv[i++], &search->limit) == TCL_ERROR) {
2254 	        Tcl_AppendResult (interp, " while processing search limit", (char *) NULL);
2255 	        return TCL_ERROR;
2256 	    }
2257 
2258 	    if (search->limit < 0) {
2259 	        Tcl_AppendResult (interp, "Search limit cannot be negative", (char *) NULL);
2260 	        return TCL_ERROR;
2261 	    }
2262 	    break;
2263 	  }
2264 
2265 	  case SEARCH_OPT_CODE: {
2266 	      search->codeBody = objv[i++];
2267 	      break;
2268 	  }
2269 
2270 	  case SEARCH_OPT_POLL_CODE: {
2271 	      search->pollCodeBody = objv[i++];
2272 	      if (!search->pollInterval)
2273 		  search->pollInterval = CTABLE_DEFAULT_POLL_INTERVAL;
2274 	      break;
2275 	  }
2276 
2277 	  case SEARCH_OPT_POLL_INTERVAL: {
2278 	    int poll_interval;
2279 	    if (Tcl_GetIntFromObj (interp, objv[i++], &poll_interval) == TCL_ERROR) {
2280 	        Tcl_AppendResult (interp, " while processing poll_interval option", (char *) NULL);
2281 	        return TCL_ERROR;
2282 	    }
2283 
2284 	    search->pollInterval = poll_interval;
2285 	    break;
2286 	  }
2287 
2288 	  case SEARCH_OPT_DELETE: {
2289 	    int do_delete;
2290 	    if (Tcl_GetIntFromObj (interp, objv[i++], &do_delete) == TCL_ERROR) {
2291 	        Tcl_AppendResult (interp, " while processing delete option", (char *) NULL);
2292 	        return TCL_ERROR;
2293 	    }
2294 
2295 	    if(do_delete) {
2296 	      if(ctable->cursors) {
2297 	        Tcl_AppendResult(interp, "Can not delete while cursors are active.", NULL);
2298 	        Tcl_SetErrorCode (interp, "speedtables", "no_delete_with_cursors", NULL);
2299 	        return TCL_ERROR;
2300 	      }
2301 	      if(previous_search) {
2302 	        Tcl_AppendResult(interp, "Can not delete in nested search.", NULL);
2303 	        Tcl_SetErrorCode (interp, "speedtables", "no_delete_inside_search", NULL);
2304 	        return TCL_ERROR;
2305 	      }
2306 #ifdef WITH_SHARED_TABLES
2307 	      if(ctable->share_type == CTABLE_SHARED_READER) {
2308 		Tcl_AppendResult (interp, "Can't modify read-only tables.", (char *)NULL);
2309 		return TCL_ERROR;
2310 	      }
2311 #endif
2312 	      if(search->tranType != CTABLE_SEARCH_TRAN_NONE &&
2313 	         search->tranType != CTABLE_SEARCH_TRAN_DELETE
2314 	      ) {
2315 		Tcl_AppendResult (interp, "Can not combine -delete with other transaction options", (char *)NULL);
2316 		return TCL_ERROR;
2317 	      }
2318 	      search->tranType = CTABLE_SEARCH_TRAN_DELETE;
2319 	    } else {
2320 	      if(search->tranType == CTABLE_SEARCH_TRAN_DELETE)
2321 		search->tranType = CTABLE_SEARCH_TRAN_NONE;
2322 	    }
2323 	    break;
2324 	  }
2325 
2326 	  case SEARCH_OPT_UPDATE: {
2327 #ifdef WITH_SHARED_TABLES
2328 	    if(ctable->share_type == CTABLE_SHARED_READER) {
2329 		Tcl_AppendResult (interp, "Can't modify read-only tables.", (char *)NULL);
2330 		return TCL_ERROR;
2331 	    }
2332 #endif
2333 	    if(search->tranType != CTABLE_SEARCH_TRAN_NONE &&
2334 	       search->tranType != CTABLE_SEARCH_TRAN_UPDATE
2335 	    ) {
2336 		Tcl_AppendResult (interp, "Can not combine -update with other transaction options", (char *)NULL);
2337 		return TCL_ERROR;
2338 	    }
2339 	    search->tranType = CTABLE_SEARCH_TRAN_UPDATE;
2340 	    search->tranData = objv[i++];
2341 	    break;
2342 	  }
2343 
2344 	  case SEARCH_OPT_BUFFER: {
2345 	    int do_buffer;
2346 	    if (Tcl_GetIntFromObj (interp, objv[i++], &do_buffer) == TCL_ERROR) {
2347 	        Tcl_AppendResult (interp, " while processing buffer option", (char *) NULL);
2348 	        return TCL_ERROR;
2349 	    }
2350 
2351 	    search->bufferResults = do_buffer;
2352 	    break;
2353 	  }
2354 
2355 	  case SEARCH_OPT_TAB: {
2356 	    search->sepstr = Tcl_GetString(objv[i++]);
2357 	    break;
2358 	  }
2359 
2360 	  case SEARCH_OPT_NULL_STRING: {
2361 	    search->nullString = Tcl_GetString(objv[i++]);
2362 	    break;
2363 	  }
2364 
2365 	  case SEARCH_OPT_QUOTE_TYPE: {
2366 	    search->quoteType = ctable_parseQuoteType(interp, objv[i++]);
2367 	    if(search->quoteType < 0) {
2368 		Tcl_AppendResult (interp, "in argument to -quote", NULL);
2369 		return TCL_ERROR;
2370 	    }
2371 	    break;
2372 	  }
2373 
2374 	  case SEARCH_OPT_WRITE_TABSEP: {
2375 	    int        mode;
2376 	    char      *channelName;
2377 
2378 	    if (search->action != CTABLE_SEARCH_ACTION_NONE)
2379 		goto actionOverload;
2380 
2381 	    channelName = Tcl_GetString (objv[i++]);
2382 	    if ((search->tabsepChannel = Tcl_GetChannel (interp, channelName, &mode)) == NULL) {
2383 	        Tcl_AppendResult (interp, " while processing write_tabsep channel", (char *) NULL);
2384 	        return TCL_ERROR;
2385 	    }
2386 
2387 	    if (!(mode & TCL_WRITABLE)) {
2388 		Tcl_AppendResult (interp, "channel \"", channelName, "\" not writable", (char *)NULL);
2389 		return TCL_ERROR;
2390 	    }
2391 
2392 	    search->action = CTABLE_SEARCH_ACTION_WRITE_TABSEP;
2393 
2394 	    break;
2395 	  }
2396 
2397 	  case SEARCH_OPT_CURSOR: {
2398 	    char *cursorName = NULL;
2399 	    struct cursor *c;
2400 
2401             if(search->tranType != CTABLE_SEARCH_TRAN_NONE || search->action != CTABLE_SEARCH_ACTION_NONE) {
2402 		Tcl_AppendResult (interp, "Can not combine -cursor with other operations", (char *)NULL);
2403 	    }
2404 
2405 	    cursorName = Tcl_GetString (objv[i++]);
2406 
2407 	    if (strcmp (cursorName, "#auto") == 0) {
2408 		char *tableName = NULL;
2409 		int tableNameLength;
2410 		static unsigned long int auto_cursor_id = 0;
2411 
2412 		// use command name of the ctable as the base of the cursor name
2413 		tableName = Tcl_GetStringFromObj (objv[0], &tableNameLength);
2414 		tableNameLength += 42+2;
2415 		cursorName = (char *) ckalloc (tableNameLength);
2416 		snprintf(cursorName, tableNameLength, "%s_C%lu", tableName, ++auto_cursor_id);
2417 	    } else {
2418 		char *tmp = (char *)ckalloc (strlen(cursorName) + 1);
2419 		strcpy(tmp, cursorName);
2420 		cursorName = tmp;
2421 	    }
2422 
2423 	    for (c = ctable->cursors; c; c = c->nextCursor) {
2424 		if(strcmp(c->cursorName, cursorName) == 0) {
2425 		    Tcl_AppendResult (interp, "Cursor name must not duplicate an existing cursor on the same table", (char *)NULL);
2426 		    ckfree(cursorName);
2427 	            return TCL_ERROR;
2428 		}
2429 	    }
2430 
2431 	    search->tranType = CTABLE_SEARCH_TRAN_CURSOR;
2432 	    search->action = CTABLE_SEARCH_ACTION_CURSOR;
2433 	    search->cursorName = cursorName;
2434 	  }
2435 	}
2436     }
2437 
2438     // If we have a code body, make sure we're not doing a write_tabsep or returning a cursor, make
2439     // sure we have a row variable or a key variable, and that we're not
2440     // leaving the search action "none"
2441     if (search->codeBody != NULL) {
2442 	if (search->action == CTABLE_SEARCH_ACTION_WRITE_TABSEP || search->action == CTABLE_SEARCH_ACTION_CURSOR) {
2443 	    Tcl_AppendResult (interp, "Both -code and -write_tabsep or -cursor specified", (char *)NULL);
2444 	    goto errorReturn;
2445 	}
2446 	if (search->rowVarNameObj == NULL && search->keyVarNameObj == NULL) {
2447 	    Tcl_AppendResult (interp, "Code block specified, but none of -key, -get, -array, -array_get, -array_with_nulls, or -array_get_with_nulls provided", NULL);
2448 	    goto errorReturn;
2449 	}
2450 	if(search->action == CTABLE_SEARCH_ACTION_NONE)
2451 	    search->action = CTABLE_SEARCH_ACTION_CODE;
2452     }
2453 
2454     // If we're doing a transaction, make sure we're not leaving the search
2455     // action "none"
2456     if(search->tranType != CTABLE_SEARCH_TRAN_NONE) {
2457 	if(search->action == CTABLE_SEARCH_ACTION_NONE)
2458 	    search->action = CTABLE_SEARCH_ACTION_TRANSACTION_ONLY;
2459     }
2460 
2461     // If there's nothing going on in the search, then skip the search and
2462     // return quick_count (calculated earlier).
2463     if(search->action == CTABLE_SEARCH_ACTION_NONE) {
2464 	if(search->nComponents == 0 && search->nFilters == 0 && search->nRetrieveFields <= 0 && search->codeBody == NULL && search->pattern == NULL && search->rowVarNameObj == NULL && search->keyVarNameObj == NULL) {
2465 
2466 	    if (search->offset) {
2467 		quick_count -= search->offset;
2468 		if (quick_count < 0) {
2469 		    quick_count = 0;
2470 		}
2471 	    }
2472 
2473 	    if (search->limit) {
2474 		if (quick_count > search->limit) {
2475 		    quick_count = search->limit;
2476 		}
2477 	    }
2478 
2479 	    Tcl_SetObjResult (interp, Tcl_NewIntObj (quick_count));
2480 	    return TCL_RETURN;
2481 	}
2482     }
2483 
2484     if (search->action == CTABLE_SEARCH_ACTION_WRITE_TABSEP) {
2485 	ctable_checkForKey(ctable, search);
2486     } else {
2487 	if(search->writingTabsepIncludeFieldNames) {
2488 	    Tcl_AppendResult (interp, "can't use -with_field_names without -write_tabsep", (char *) NULL);
2489 	    return TCL_ERROR;
2490 	}
2491 
2492         if (search->codeBody == NULL && search->sortControl.nFields) {
2493 	    Tcl_AppendResult (interp, "Sorting must be accompanied by -code or -write_tabsep", (char *) NULL);
2494 	    return TCL_ERROR;
2495         }
2496     }
2497 
2498     return TCL_OK;
2499 
2500   actionOverload:
2501 
2502     Tcl_AppendResult (interp, "only one of -array, -array_with_nulls, -array_get, -array_get_with_nulls, or -write_tabsep must be specified", (char *) NULL);
2503 
2504   errorReturn:
2505 
2506     return TCL_ERROR;
2507 }
2508 
2509 //
2510 // ctable_elapsed_time - calculate the time interval between two timespecs and store in a new
2511 // timespec
2512 //
2513 void
ctable_elapsed_time(struct timespec * oldtime,struct timespec * newtime,struct timespec * elapsed)2514 ctable_elapsed_time (struct timespec *oldtime, struct timespec *newtime, struct timespec *elapsed) {
2515     elapsed->tv_sec = newtime->tv_sec - oldtime->tv_sec;
2516     elapsed->tv_nsec = newtime->tv_nsec - oldtime->tv_nsec;
2517 
2518     if (elapsed->tv_nsec < 0) {
2519 	elapsed->tv_nsec += 1000000000;
2520 	elapsed->tv_sec--;
2521     }
2522 }
2523 
2524 #ifdef CTABLES_CLOCK
2525 //
2526 // ctable_performance_callback - callback routine for performance of search calls
2527 //
2528 void
ctable_performance_callback(Tcl_Interp * interp,CTable * ctable,Tcl_Obj * CONST objv[],int objc,struct timespec * startTimeSpec,int loggingMatchCount)2529 ctable_performance_callback (Tcl_Interp *interp, CTable *ctable, Tcl_Obj *CONST objv[], int objc, struct timespec *startTimeSpec, int loggingMatchCount) {
2530     struct timespec endTimeSpec;
2531     struct timespec elapsedTimeSpec;
2532     Tcl_Obj *cmdObjv[4];
2533     double cpu;
2534     int i;
2535 
2536     // calculate elapsed cpu
2537 
2538     clock_gettime (CTABLES_CLOCK, &endTimeSpec);
2539     ctable_elapsed_time (startTimeSpec, &endTimeSpec, &elapsedTimeSpec);
2540     cpu = (elapsedTimeSpec.tv_sec + (elapsedTimeSpec.tv_nsec / 1000000000.0));
2541 
2542     if (cpu < ctable->performanceCallbackThreshold) {
2543 	return;
2544     }
2545 
2546     cmdObjv[0] = Tcl_NewStringObj (ctable->performanceCallback, -1);
2547     cmdObjv[1] = Tcl_NewListObj (objc, objv);
2548     cmdObjv[2] = Tcl_NewIntObj (loggingMatchCount);
2549     cmdObjv[3] = Tcl_NewDoubleObj (cpu);
2550 
2551     for (i = 0; i < 4; i++) {
2552 	Tcl_IncrRefCount (cmdObjv[i]);
2553     }
2554 
2555     if (Tcl_EvalObjv (interp, 4, cmdObjv, 0) == TCL_ERROR) {
2556 	Tcl_BackgroundError (interp);
2557     }
2558 
2559     for (i = 0; i < 4; i++) {
2560 	Tcl_DecrRefCount (cmdObjv[i]);
2561     }
2562 
2563 }
2564 #endif
2565 
2566 //
2567 // ctable_TeardownSearch - tear down (free) a search structure and the
2568 //  stuff within it.
2569 //
2570 static void
ctable_TeardownSearch(CTableSearch * search)2571 ctable_TeardownSearch (CTableSearch *search) {
2572     int i;
2573 
2574     if (search->filters) {
2575 	ckfree((char*)search->filters);
2576 	search->filters = NULL;
2577     }
2578 
2579     if (search->components == NULL) {
2580         return;
2581     }
2582 
2583     // teardown components
2584     for (i = 0; i < search->nComponents; i++) {
2585 	CTableSearchComponent  *component = &search->components[i];
2586 
2587 	if (component->row1 != NULL) {
2588 	    search->ctable->creator->delete_row (search->ctable, component->row1, CTABLE_INDEX_PRIVATE);
2589 	}
2590 
2591 	if (component->row2 != NULL) {
2592 	    search->ctable->creator->delete_row (search->ctable, component->row2, CTABLE_INDEX_PRIVATE);
2593 	}
2594 
2595 	if (component->row3 != NULL) {
2596 	    search->ctable->creator->delete_row (search->ctable, component->row3, CTABLE_INDEX_PRIVATE);
2597 	}
2598 
2599 	if (component->clientData != NULL) {
2600 	    // this needs to be pluggable
2601 	    if ((component->comparisonType == CTABLE_COMP_MATCH) || (component->comparisonType == CTABLE_COMP_NOTMATCH) || (component->comparisonType == CTABLE_COMP_MATCH_CASE) || (component->comparisonType == CTABLE_COMP_NOTMATCH_CASE)) {
2602 		struct ctableSearchMatchStruct *sm = (struct ctableSearchMatchStruct*) component->clientData;
2603 		if (sm->type == CTABLE_STRING_MATCH_UNANCHORED) {
2604 		    boyer_moore_teardown (sm);
2605 		}
2606 	    }
2607 
2608 	    ckfree ((char*)component->clientData);
2609 	}
2610 
2611 	ctable_FreeInRows(search->ctable, component);
2612     }
2613 
2614     ckfree ((char *)search->components);
2615     search->components = NULL;
2616 
2617     if (search->sortControl.fields != NULL) {
2618         ckfree ((char *)search->sortControl.fields);
2619 	search->sortControl.fields = NULL;
2620         ckfree ((char *)search->sortControl.directions);
2621 	search->sortControl.directions = NULL;
2622     }
2623 
2624     if (search->retrieveFields != NULL) {
2625 	ckfree ((char *)search->retrieveFields);
2626 	search->retrieveFields = NULL;
2627     }
2628 
2629     if (search->tranTable != NULL) {
2630         ckfree ((char *)search->tranTable);
2631         search->tranTable = NULL;
2632     }
2633 }
2634 
2635 //
2636 // ctable_SetupAndPerformSearch - setup and perform a (possibly) skiplist search
2637 //   on a table.
2638 //
2639 // Uses a skip list index as the outer loop.  Still brute force unless the
2640 // foremost compare routine is tailorable, however even so, much faster
2641 // than a hash table walk.
2642 //
2643 //
2644 CTABLE_INTERNAL int
ctable_SetupAndPerformSearch(Tcl_Interp * interp,Tcl_Obj * CONST objv[],int objc,CTable * ctable,int indexField)2645 ctable_SetupAndPerformSearch (Tcl_Interp *interp, Tcl_Obj *CONST objv[], int objc, CTable *ctable, int indexField) {
2646     CTableSearch    search;
2647     int result;
2648 #ifdef CTABLES_CLOCK
2649     struct timespec startTimeSpec;
2650     int loggingMatchCount = 0;
2651 #endif
2652 
2653 
2654 #ifdef CTABLES_CLOCK
2655     if (ctable->performanceCallbackEnable) {
2656 	clock_gettime (CTABLES_CLOCK, &startTimeSpec);
2657     }
2658 #endif
2659 
2660     // flag this search in progress
2661     CTableSearch *previous_search = ctable->searches;
2662     ctable->searches = &search;
2663 
2664     result = ctable_SetupSearch (interp, ctable, objv, objc, &search, indexField, previous_search);
2665     if (result == TCL_ERROR) {
2666         ctable->searches = previous_search;
2667         return TCL_ERROR;
2668     }
2669 
2670     // return from "setup" means "search optimized away"
2671     if (result == TCL_RETURN) {
2672 	result = TCL_OK;
2673     } else {
2674         result = ctable_PerformSearch (interp, ctable, &search);
2675     }
2676 
2677 #ifdef CTABLES_CLOCK
2678     if (ctable->performanceCallbackEnable) {
2679 	loggingMatchCount = search.matchCount;
2680     }
2681 #endif
2682 
2683     ctable_TeardownSearch (&search);
2684 
2685     ctable->searches = previous_search;
2686 
2687 #ifdef CTABLES_CLOCK
2688     if (ctable->performanceCallbackEnable) {
2689 	Tcl_Obj *saveResultObj = Tcl_GetObjResult (interp);
2690 	Tcl_IncrRefCount (saveResultObj);
2691 	ctable_performance_callback (interp, ctable, objv, objc, &startTimeSpec, loggingMatchCount);
2692 	Tcl_SetObjResult (interp, saveResultObj);
2693     }
2694 #endif
2695 
2696     return result;
2697 }
2698 
2699 //
2700 // ctable_DropIndex - delete all the rows in a row's index, free the
2701 // structure and set the field's pointer to the skip list to NULL
2702 //
2703 // "final" means "we're destroying the ctable". This allows us to avoid
2704 // deleting structures in shared memory that are going away anyway.
2705 //
2706 CTABLE_INTERNAL void
ctable_DropIndex(CTable * ctable,int field,int final)2707 ctable_DropIndex (CTable *ctable, int field, int final) {
2708     jsw_skip_t *skip = ctable->skipLists[field];
2709     ctable_BaseRow *row;
2710     int listIndexNumber = ctable->creator->fields[field]->indexNumber;
2711 
2712     if (skip == NULL) return;
2713 
2714     // Forget I had a skiplist
2715     ctable->skipLists[field] = NULL;
2716 
2717     // Delete the skiplist
2718     jsw_sdelete_skiplist (skip, final);
2719 
2720     // Don't need to reset the bucket list if it's just going to be deleted
2721     if(final) return;
2722 
2723     // Walk the table and erase the bucket lists for the row
2724     // We're walking the main index for the table so it's safe to
2725     // clear the index for $field
2726     CTABLE_LIST_FOREACH (ctable->ll_head, row, 0) {
2727 	row->_ll_nodes[listIndexNumber].next = NULL;
2728 	row->_ll_nodes[listIndexNumber].prev = NULL;
2729 	row->_ll_nodes[listIndexNumber].head = NULL;
2730     }
2731 }
2732 
2733 //
2734 // ctable_DropAllIndexes - delete all of a table's indexes
2735 //
2736 CTABLE_INTERNAL void
ctable_DropAllIndexes(CTable * ctable,int final)2737 ctable_DropAllIndexes (CTable *ctable, int final) {
2738     int field;
2739 
2740     for (field = 0; field < ctable->creator->nFields; field++) {
2741         ctable_DropIndex (ctable, field, final);
2742     }
2743 }
2744 
2745 //
2746 // ctable_IndexCount -- set the Tcl interpreter obj result to the
2747 //                      number of items in the index
2748 //
2749 // mostly just going to be used as a cross-check for testing to make sure
2750 // inserts and deletes into indexes corresponding to changes in rows works
2751 // properly
2752 //
2753 CTABLE_INTERNAL int
ctable_IndexCount(Tcl_Interp * interp,CTable * ctable,int field)2754 ctable_IndexCount (Tcl_Interp *interp, CTable *ctable, int field) {
2755     jsw_skip_t *skip = ctable->skipLists[field];
2756     int         count;
2757 
2758     if (skip == NULL) {
2759 	Tcl_AppendResult (interp, "that field does not have an index", (char *) NULL);
2760 	return TCL_ERROR;
2761     }
2762 
2763     count = (int)jsw_ssize(skip);
2764     Tcl_SetObjResult (interp, Tcl_NewIntObj (count));
2765     return TCL_OK;
2766 }
2767 
2768 CTABLE_INTERNAL int
ctable_DumpIndex(CTable * ctable,int field)2769 ctable_DumpIndex (CTable *ctable, int field) {
2770     jsw_skip_t *skip = ctable->skipLists[field];
2771     ctable_BaseRow *row;
2772     Tcl_Obj    *utilityObj = Tcl_NewObj ();
2773     CONST char *s;
2774 
2775     if (skip == NULL) {
2776         return TCL_OK;
2777     }
2778 
2779     jsw_dump_head (skip);
2780 
2781     for (jsw_sreset (skip); (row = jsw_srow (skip)) != NULL; jsw_snext(skip)) {
2782         s = ctable->creator->get_string (row, field, NULL, utilityObj);
2783 	jsw_dump (s, skip, ctable->creator->fields[field]->indexNumber);
2784     }
2785 
2786     Tcl_DecrRefCount (utilityObj);
2787 
2788     return TCL_OK;
2789 }
2790 
2791 
2792 //
2793 // ctable_ListIndex - return a list of all of the index values
2794 //
2795 // warning - can be hugely inefficient if you have a zillion elements
2796 // but useful for testing
2797 //
2798 CTABLE_INTERNAL int
ctable_ListIndex(Tcl_Interp * interp,CTable * ctable,int fieldNum)2799 ctable_ListIndex (Tcl_Interp *interp, CTable *ctable, int fieldNum) {
2800     jsw_skip_t *skip = ctable->skipLists[fieldNum];
2801     ctable_BaseRow    *p;
2802     Tcl_Obj    *resultObj = Tcl_GetObjResult (interp);
2803 
2804     if (skip == NULL) {
2805         return TCL_OK;
2806     }
2807 
2808     for (jsw_sreset (skip); (p = jsw_srow (skip)) != NULL; jsw_snext(skip)) {
2809 
2810         if (ctable->creator->lappend_field (interp, resultObj, p, fieldNum) == TCL_ERROR) {
2811 	    Tcl_AppendResult (interp, " while walking index fields", (char *) NULL);
2812 	    return TCL_ERROR;
2813 	}
2814     }
2815 
2816     return TCL_OK;
2817 }
2818 
2819 CTABLE_INTERNAL INLINE void
ctable_RemoveFromIndex(CTable * ctable,void * vRow,int field)2820 ctable_RemoveFromIndex (CTable *ctable, void *vRow, int field) {
2821     jsw_skip_t *skip = ctable->skipLists[field];
2822     ctable_BaseRow *row = (ctable_BaseRow*) vRow;
2823     int index = ctable->creator->fields[field]->indexNumber;
2824 
2825 #ifdef SEARCHDEBUG
2826 printf("ctable_RemoveFromIndex row 0x%lx, field %d (%s) skip == 0x%lx\n", (long)row, field, ctable->creator->fieldNames[field], (long unsigned int)skip);
2827 if(field == TRACKFIELD) {
2828   printf("BEFORE=  ");
2829   ctable_DumpIndex (ctable, field);
2830 }
2831 #endif
2832     // jsw_dump_head(skip);
2833 
2834     if (skip == NULL) {
2835 //printf("no skiplist\n");
2836         return;
2837     }
2838 
2839     // invariant: prev is never NULL if in list
2840     if(row->_ll_nodes[index].prev == NULL) {
2841 //printf("not in list\n");
2842 	return;
2843     }
2844     if (ctable_ListRemoveMightBeTheLastOne (row, index)) {
2845 //printf("i might be the last one, field %d\n", field);
2846         // it might be the last one, see if it really was
2847 //printf ("row->_ll_nodes[index].head %lx\n", (long unsigned int)row->_ll_nodes[index].head);
2848 	if (*row->_ll_nodes[index].head == NULL) {
2849 //printf("erasing last entry field %d\n", field);
2850             // put the pointer back so the compare routine will have
2851 	    // something to match
2852             *row->_ll_nodes[index].head = row;
2853 	    if (!jsw_serase (skip, row)) {
2854 		fprintf (stderr, "Attempted to remove non-existent field %s\n", ctable->creator->fields[field]->name);
2855 	    }
2856 	    *row->_ll_nodes[index].head = NULL; // don't think this is needed, but do it anyway
2857 	}
2858     }
2859 #ifdef SEARCHDEBUG
2860 if(field == TRACKFIELD) {
2861   printf("AFTER=  ");
2862   ctable_DumpIndex (ctable, field);
2863 }
2864 fflush(stdout);
2865 #endif
2866 
2867     return;
2868 }
2869 
2870 //
2871 // ctable_RemoveFromAllIndexes -- remove a row from all of the indexes it's
2872 // in -- this does a bidirectional linked list remove for each
2873 //
2874 //
2875 //
2876 CTABLE_INTERNAL void
ctable_RemoveFromAllIndexes(CTable * ctable,ctable_BaseRow * row)2877 ctable_RemoveFromAllIndexes (CTable *ctable, ctable_BaseRow *row) {
2878     int         field;
2879 
2880     // everybody's in index 0, take this guy out
2881     ctable_ListRemove (row, 0);
2882 
2883     // NB slightly gross, we shouldn't have to look at all of the fields
2884     // to even see which ones could be indexed but the programmer is
2885     // in a hurry
2886     for (field = 0; field < ctable->creator->nFields; field++) {
2887 	if (ctable->skipLists[field] != NULL) {
2888 	    ctable_RemoveFromIndex (ctable, row, field);
2889 	}
2890     }
2891 }
2892 
2893 //
2894 // ctable_InsertIntoIndex - for the given field of the given row of the given
2895 // ctable, insert this row into that table's field's index if there is an
2896 // index on that field.
2897 //
2898 CTABLE_INTERNAL INLINE int
ctable_InsertIntoIndex(Tcl_Interp * interp,CTable * ctable,ctable_BaseRow * row,int field)2899 ctable_InsertIntoIndex (Tcl_Interp *interp, CTable *ctable, ctable_BaseRow *row, int field) {
2900     jsw_skip_t *skip = ctable->skipLists[field];
2901     ctable_FieldInfo *f;
2902     Tcl_Obj *utilityObj;
2903     ctable_CreatorTable *creator = ctable->creator;
2904     int index = creator->fields[field]->indexNumber;
2905 
2906     if (skip == NULL) {
2907     return TCL_OK;
2908     }
2909 
2910     // invariant: prev is always NULL if not in list
2911     if(row->_ll_nodes[index].prev != NULL) {
2912 	Tcl_Panic ("Double insert row for field %s", ctable->creator->fields[field]->name);
2913     }
2914 
2915 #ifdef SANITY_CHECKS
2916     creator->sanity_check_pointer(ctable, (void *)row, CTABLE_INDEX_NORMAL, "ctable_InsertIntoIndex : row");
2917     creator->sanity_check_pointer(ctable, (void *)skip, CTABLE_INDEX_NORMAL, "ctable_InsertIntoIndex : skip");
2918 #endif
2919 
2920     f = creator->fields[field];
2921 
2922 # ifdef SEARCHDEBUG
2923 // dump info about row being inserted
2924 utilityObj = Tcl_NewObj();
2925 printf("ctable_InsertIntoIndex row 0x%lx, field %d, field name %s, index %d, value '%s'\n", (long)row, field, f->name, f->indexNumber, ctable->creator->get_string (row, field, NULL, utilityObj));
2926 fflush(stdout);
2927 Tcl_DecrRefCount (utilityObj);
2928 if(field == TRACKFIELD) {
2929   printf("BEFORE=  ");
2930   ctable_DumpIndex (ctable, field);
2931 }
2932 #endif
2933 
2934     if (!jsw_sinsert_linked (skip, row, f->indexNumber, f->unique)) {
2935 
2936 	utilityObj = Tcl_NewObj();
2937 	Tcl_AppendResult (interp, "unique check failed for field \"", f->name, "\", value \"", ctable->creator->get_string (row, field, NULL, utilityObj), "\"", (char *) NULL);
2938 	Tcl_DecrRefCount (utilityObj);
2939         return TCL_ERROR;
2940     }
2941 
2942 # ifdef SEARCHDEBUG
2943     // ctable_verifyField(ctable, field, 0);
2944 if(field == TRACKFIELD) {
2945   printf("AFTER=  ");
2946   ctable_DumpIndex (ctable, field);
2947 }
2948 fflush(stdout);
2949 #endif
2950 
2951     return TCL_OK;
2952 }
2953 
2954 //
2955 // ctable_CreateIndex - create an index on a specified field of a specified
2956 // ctable.
2957 //
2958 CTABLE_INTERNAL int
ctable_CreateIndex(Tcl_Interp * interp,CTable * ctable,int field,int depth)2959 ctable_CreateIndex (Tcl_Interp *interp, CTable *ctable, int field, int depth) {
2960     ctable_BaseRow *row;
2961 
2962     jsw_skip_t      *skip;
2963 
2964     // make sure the field has an index set up for it
2965     // in the linked list nodes of the row.
2966     if (ctable->creator->fields[field]->indexNumber < 0) {
2967 	Tcl_AppendResult (interp, "can't create an index on field '", ctable->creator->fields[field]->name, "' that hasn't been defined as having an index", (char *)NULL);
2968 	return TCL_ERROR;
2969     }
2970 
2971     // make sure we're allowed to do this
2972 #ifdef WITH_SHARED_TABLES
2973     if(ctable->share_type == CTABLE_SHARED_READER) {
2974 	Tcl_AppendResult (interp, "can't create an index on a read-only table", (char *)NULL);
2975 	return TCL_ERROR;
2976     }
2977 #endif
2978 
2979     // if there's already a skip list, just say "fine"
2980     if ((skip = ctable->skipLists[field]) != NULL) {
2981         return TCL_OK;
2982     }
2983 
2984 #ifdef WITH_SHARED_TABLES
2985     if(ctable->share_type == CTABLE_SHARED_MASTER)
2986         skip = jsw_snew (depth, ctable->creator->fields[field]->compareFunction, ctable->share);
2987     else
2988 #endif
2989         skip = jsw_snew (depth, ctable->creator->fields[field]->compareFunction, NULL);
2990 
2991     // we should plug the list in last, so that concurrent users don't
2992     // walk an incomplete skiplist, but ctable_InsertIntoIndex needs this
2993     // TODO: make ctable_InsertIntoIndex a wrapper around a new "ctable_InsertIntoSkiplist"?
2994     ctable->skipLists[field] = skip;
2995 
2996     // Walk the whole table to create the index
2997     CTABLE_LIST_FOREACH (ctable->ll_head, row, 0) {
2998 	// we want to be able to call out to an error handler rather
2999 	// than fail and unwind the stack.
3000 	// (not here so much as in read_tabsep because here we just unwind
3001 	// and undo the new index if we get an error)
3002 	if (ctable_InsertIntoIndex (interp, ctable, row, field) == TCL_ERROR) {
3003 	    // you can't leave them with a partial index or there will
3004 	    // be heck to pay later when queries don't find all the
3005 	    // rows, etc
3006 	    jsw_sdelete_skiplist (skip, 0);
3007 	    ctable->skipLists[field] = NULL;
3008 	    Tcl_AppendResult (interp, " while creating index", (char *) NULL);
3009 	    return TCL_ERROR;
3010 	}
3011     }
3012 
3013     return TCL_OK;
3014 }
3015 
3016 CTABLE_INTERNAL int
ctable_LappendIndexLowAndHi(Tcl_Interp * interp,CTable * ctable,int field)3017 ctable_LappendIndexLowAndHi (Tcl_Interp *interp, CTable *ctable, int field) {
3018     jsw_skip_t            *skip = ctable->skipLists[field];
3019     ctable_BaseRow        *row;
3020     Tcl_Obj               *resultObj = Tcl_GetObjResult (interp);
3021 
3022     if (skip == NULL) {
3023 	Tcl_AppendResult (interp, "that field isn't indexed", (char *)NULL);
3024 	return TCL_ERROR;
3025     }
3026 
3027     jsw_sreset (skip);
3028     row = jsw_srow (skip);
3029 
3030     if (row == NULL) {
3031         return TCL_OK;
3032     }
3033 
3034     if (ctable->creator->lappend_field (interp, resultObj, row, ctable->creator->fieldList[field]) == TCL_ERROR) {
3035         return TCL_ERROR;
3036     }
3037 
3038     jsw_findlast (skip);
3039     row = jsw_srow (skip);
3040 
3041     if (ctable->creator->lappend_field (interp, resultObj, row, ctable->creator->fieldList[field]) == TCL_ERROR) {
3042         return TCL_ERROR;
3043     }
3044 
3045     return TCL_OK;
3046 }
3047 
3048 CTABLE_INTERNAL int
ctable_DestroyCursor(Tcl_Interp * interp,struct cursor * cursor)3049 ctable_DestroyCursor(Tcl_Interp *interp, struct cursor *cursor)
3050 {
3051     if(!cursor->ownerTable) return TCL_OK; // being deleted
3052     CTable *ctable = cursor->ownerTable;
3053     cursor->ownerTable = NULL;
3054 
3055     struct cursor *prev = NULL;
3056     struct cursor *next = ctable->cursors;
3057     while(next && next != cursor) {
3058 	prev = next;
3059 	next = next->nextCursor;
3060     }
3061     if(!next) return TCL_OK; // already deleted
3062 
3063     // Remove from ctable
3064     if(prev)
3065 	prev->nextCursor = next->nextCursor;
3066     else
3067 	ctable->cursors = next->nextCursor;
3068 
3069     if(cursor->tranTable) {
3070 	ckfree(cursor->tranTable);
3071 	cursor->tranTable = NULL;
3072     }
3073 
3074     if(cursor->cursorName) {
3075 	ckfree(cursor->cursorName);
3076 	cursor->cursorName = NULL;
3077     }
3078 
3079     if(interp && cursor->commandInfo) {
3080 	Tcl_Command tmp = cursor->commandInfo;
3081 	cursor->commandInfo = NULL;
3082 	Tcl_DeleteCommandFromToken(interp, tmp);
3083     }
3084 
3085 #ifdef WITH_SHARED_TABLES
3086     // destroying a read-locked cursor? Tag the whole ctable as locked
3087     if(cursor->lockCycle != LOST_HORIZON)
3088 	ctable->cursorLock = 1;
3089 
3090     if(!ctable->cursors) {
3091 	// if the table was tagged as locked, and this is the last cursor, unlock it
3092 	if(ctable->cursorLock) {
3093 	    read_unlock(ctable->share);
3094 	    ctable->cursorLock = 0;
3095 	}
3096         end_write(ctable);
3097     }
3098 #endif
3099 
3100     ckfree(cursor);
3101 
3102     return TCL_OK;
3103 }
3104 
3105 CTABLE_INTERNAL struct cursor *
ctable_CreateEmptyCursor(Tcl_Interp * interp,CTable * ctable,char * cursorName)3106 ctable_CreateEmptyCursor(Tcl_Interp *interp, CTable *ctable, char *cursorName)
3107 {
3108 	// ALLOCATE and build new cursor
3109         struct cursor *cursor = (struct cursor *)ckalloc(sizeof (struct cursor));
3110 
3111 	// CREATE empty transaction table (1 element, empty)
3112 	cursor->tranTable = (ctable_BaseRow **)ckalloc (sizeof (ctable_BaseRow *));
3113 	cursor->tranTable[0] = (ctable_BaseRow *)NULL;
3114 	cursor->tranIndex = 0;
3115 
3116 	// INITIALIZE default values
3117         cursor->cursorName = cursorName;
3118         cursor->offset = 0;
3119         cursor->offsetLimit = 0;
3120 	cursor->commandInfo = NULL;
3121 #ifdef WITH_SHARED_TABLES
3122 	cursor->lockCycle = LOST_HORIZON;
3123 #endif
3124 
3125 	// INSERT cursor into cursor list
3126         cursor->nextCursor = ctable->cursors;
3127         ctable->cursors = cursor;
3128 
3129 	// SAVE ctable link in cursor
3130         cursor->ownerTable = ctable;
3131 
3132 	// MARK cursor as valid
3133 	cursor->cursorState = CTABLE_CURSOR_OK;
3134 
3135 	return cursor;
3136 }
3137 
3138 CTABLE_INTERNAL struct cursor *
ctable_CreateCursor(Tcl_Interp * interp,CTable * ctable,CTableSearch * search)3139 ctable_CreateCursor(Tcl_Interp *interp, CTable *ctable, CTableSearch *search)
3140 {
3141 	// Defense - if it's already been created from this search, or the search doesn't need a cursor, drop it
3142 	if (!search->tranTable || !search->cursorName) return NULL;
3143 
3144 	// ALLOCATE and build new cursor
3145         struct cursor *cursor = (struct cursor *)ckalloc(sizeof (struct cursor));
3146 
3147 	// MOVE transaction table naem cursor name to cursor
3148         cursor->tranTable = search->tranTable;
3149         search->tranTable = NULL;
3150         cursor->cursorName = search->cursorName;
3151 	search->cursorName = NULL;
3152 
3153 	// COPY search cursor info to cursor
3154         cursor->offset = cursor->tranIndex = search->offset;
3155         cursor->offsetLimit = search->offsetLimit;
3156 
3157 	// INSERT cursor into cursor list
3158         cursor->nextCursor = ctable->cursors;
3159         ctable->cursors = cursor;
3160 
3161 	// SAVE ctable link in cursor
3162         cursor->ownerTable = ctable;
3163 
3164 	// MARK cursor as valid
3165 	cursor->cursorState = CTABLE_CURSOR_OK;
3166 
3167 	// INIT remaining feilds
3168 	cursor->commandInfo = NULL;
3169 #ifdef WITH_SHARED_TABLES
3170 	cursor->lockCycle = LOST_HORIZON;
3171 #endif
3172 
3173 	return cursor;
3174 }
3175 
3176 CTABLE_INTERNAL void
ctable_DeleteCursorCommand(ClientData clientData)3177 ctable_DeleteCursorCommand(ClientData clientData)
3178 {
3179 	struct cursor *cursor = (struct cursor *)clientData;
3180 
3181 	// Make sure we don't lead ourselves back here
3182 	cursor->commandInfo = NULL;
3183 
3184 	// Remove the cursor from the ctable and destroy it
3185 	ctable_DestroyCursor(NULL, cursor);
3186 }
3187 
3188 CTABLE_INTERNAL int
ctable_CreateCursorCommand(Tcl_Interp * interp,struct cursor * cursor)3189 ctable_CreateCursorCommand(Tcl_Interp *interp, struct cursor *cursor)
3190 {
3191 	if(cursor->commandInfo) return TCL_OK; // already exists
3192 
3193 	Tcl_ObjCmdProc *commandProc = cursor->ownerTable->creator->cursor_command;
3194 
3195 	cursor->commandInfo = Tcl_CreateObjCommand(interp, cursor->cursorName, commandProc, (ClientData)cursor, ctable_DeleteCursorCommand);
3196 
3197 	return cursor->commandInfo ? TCL_OK : TCL_ERROR;
3198 }
3199 
3200 CTABLE_INTERNAL Tcl_Obj *
ctable_CursorToName(struct cursor * cursor)3201 ctable_CursorToName(struct cursor *cursor)
3202 {
3203 	return Tcl_NewStringObj(cursor->cursorName, -1);
3204 }
3205 
3206 
3207 // vim: set ts=8 sw=4 sts=4 noet :
3208