1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2007, 2009, 2010, 2013 Free Software Foundation, Inc.
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/casereader.h"
20 #include "data/casereader-provider.h"
21
22 #include <stdlib.h>
23
24 #include "data/casereader-shim.h"
25 #include "data/casewriter.h"
26 #include "libpspp/assertion.h"
27 #include "libpspp/heap.h"
28 #include "libpspp/taint.h"
29
30 #include "gl/xalloc.h"
31
32 /* A casereader. */
33 struct casereader
34 {
35 struct taint *taint; /* Corrupted? */
36 struct caseproto *proto; /* Format of contained cases. */
37 casenumber case_cnt; /* Number of cases,
38 CASENUMBER_MAX if unknown. */
39 const struct casereader_class *class; /* Class. */
40 void *aux; /* Auxiliary data for class. */
41 };
42
43 /* Reads and returns the next case from READER. The caller owns
44 the returned case and must call case_unref on it when its data
45 is no longer needed. Returns a null pointer if cases have
46 been exhausted or upon detection of an I/O error.
47
48 The case returned is effectively consumed: it can never be
49 read again through READER. If this is inconvenient, READER
50 may be cloned in advance with casereader_clone, or
51 casereader_peek may be used instead. */
52 struct ccase *
casereader_read(struct casereader * reader)53 casereader_read (struct casereader *reader)
54 {
55 if (reader->case_cnt != 0)
56 {
57 /* ->read may use casereader_swap to replace itself by
58 another reader and then delegate to that reader by
59 recursively calling casereader_read. Currently only
60 lazy_casereader does this and, with luck, nothing else
61 ever will.
62
63 To allow this to work, however, we must decrement
64 case_cnt before calling ->read. If we decremented
65 case_cnt after calling ->read, then this would actually
66 drop two cases from case_cnt instead of one, and we'd
67 lose the last case in the casereader. */
68 struct ccase *c;
69 if (reader->case_cnt != CASENUMBER_MAX)
70 reader->case_cnt--;
71 c = reader->class->read (reader, reader->aux);
72 if (c != NULL)
73 {
74 size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
75 assert (case_get_value_cnt (c) >= n_widths);
76 expensive_assert (caseproto_equal (case_get_proto (c), 0,
77 reader->proto, 0, n_widths));
78 return c;
79 }
80 }
81 reader->case_cnt = 0;
82 return NULL;
83 }
84
85 /* Destroys READER.
86 Returns false if an I/O error was detected on READER, true
87 otherwise. */
88 bool
casereader_destroy(struct casereader * reader)89 casereader_destroy (struct casereader *reader)
90 {
91 bool ok = true;
92 if (reader != NULL)
93 {
94 reader->class->destroy (reader, reader->aux);
95 ok = taint_destroy (reader->taint);
96 caseproto_unref (reader->proto);
97 free (reader);
98 }
99 return ok;
100 }
101
102 /* Returns a clone of READER. READER and its clone may be used
103 to read the same sequence of cases in the same order, barring
104 I/O errors. */
105 struct casereader *
casereader_clone(const struct casereader * reader_)106 casereader_clone (const struct casereader *reader_)
107 {
108 struct casereader *reader = CONST_CAST (struct casereader *, reader_);
109 struct casereader *clone;
110 if (reader == NULL)
111 return NULL;
112
113 if (reader->class->clone == NULL)
114 casereader_shim_insert (reader);
115 clone = reader->class->clone (reader, reader->aux);
116 assert (clone != NULL);
117 assert (clone != reader);
118 return clone;
119 }
120
121 /* Returns a copy of READER, which is itself destroyed.
122 Useful for taking over ownership of a casereader, to enforce
123 preventing the original owner from accessing the casereader
124 again. */
125 struct casereader *
casereader_rename(struct casereader * reader)126 casereader_rename (struct casereader *reader)
127 {
128 struct casereader *new = xmemdup (reader, sizeof *reader);
129 free (reader);
130 return new;
131 }
132
133 /* Exchanges the casereaders referred to by A and B. */
134 void
casereader_swap(struct casereader * a,struct casereader * b)135 casereader_swap (struct casereader *a, struct casereader *b)
136 {
137 if (a != b)
138 {
139 struct casereader tmp = *a;
140 *a = *b;
141 *b = tmp;
142 }
143 }
144
145 /* Reads and returns the (IDX + 1)'th case from READER. The
146 caller owns the returned case and must call case_unref on it
147 when it is no longer needed. Returns a null pointer if cases
148 have been exhausted or upon detection of an I/O error. */
149 struct ccase *
casereader_peek(struct casereader * reader,casenumber idx)150 casereader_peek (struct casereader *reader, casenumber idx)
151 {
152 if (idx < reader->case_cnt)
153 {
154 struct ccase *c;
155 if (reader->class->peek == NULL)
156 casereader_shim_insert (reader);
157 c = reader->class->peek (reader, reader->aux, idx);
158 if (c != NULL)
159 return c;
160 else if (casereader_error (reader))
161 reader->case_cnt = 0;
162 }
163 if (reader->case_cnt > idx)
164 reader->case_cnt = idx;
165 return NULL;
166 }
167
168 /* Returns true if no cases remain to be read from READER, or if
169 an error has occurred on READER. (A return value of false
170 does *not* mean that the next call to casereader_peek or
171 casereader_read will return true, because an error can occur
172 in the meantime.) */
173 bool
casereader_is_empty(struct casereader * reader)174 casereader_is_empty (struct casereader *reader)
175 {
176 if (reader->case_cnt == 0)
177 return true;
178 else
179 {
180 struct ccase *c = casereader_peek (reader, 0);
181 if (c == NULL)
182 return true;
183 else
184 {
185 case_unref (c);
186 return false;
187 }
188 }
189 }
190
191 /* Returns true if an I/O error or another hard error has
192 occurred on READER, a clone of READER, or on some object on
193 which READER's data has a dependency, false otherwise. */
194 bool
casereader_error(const struct casereader * reader)195 casereader_error (const struct casereader *reader)
196 {
197 return taint_is_tainted (reader->taint);
198 }
199
200 /* Marks READER as having encountered an error.
201
202 Ordinarily, this function should be called by the
203 implementation of a casereader, not by the casereader's
204 client. Instead, casereader clients should usually ensure
205 that a casereader's error state is correct by using
206 taint_propagate to propagate to the casereader's taint
207 structure, which may be obtained via casereader_get_taint. */
208 void
casereader_force_error(struct casereader * reader)209 casereader_force_error (struct casereader *reader)
210 {
211 taint_set_taint (reader->taint);
212 }
213
214 /* Returns READER's associate taint object, for use with
215 taint_propagate and other taint functions. */
216 const struct taint *
casereader_get_taint(const struct casereader * reader)217 casereader_get_taint (const struct casereader *reader)
218 {
219 return reader->taint;
220 }
221
222 /* Returns the number of cases that will be read by successive
223 calls to casereader_read for READER, assuming that no errors
224 occur. Upon an error condition, the case count drops to 0, so
225 that no more cases can be obtained.
226
227 Not all casereaders can predict the number of cases that they
228 will produce without actually reading all of them. In that
229 case, this function returns CASENUMBER_MAX. To obtain the
230 actual number of cases in such a casereader, use
231 casereader_count_cases. */
232 casenumber
casereader_get_case_cnt(struct casereader * reader)233 casereader_get_case_cnt (struct casereader *reader)
234 {
235 return reader->case_cnt;
236 }
237
238 static casenumber
casereader_count_cases__(const struct casereader * reader,casenumber max_cases)239 casereader_count_cases__ (const struct casereader *reader,
240 casenumber max_cases)
241 {
242 struct casereader *clone;
243 casenumber n_cases;
244
245 clone = casereader_clone (reader);
246 n_cases = casereader_advance (clone, max_cases);
247 casereader_destroy (clone);
248
249 return n_cases;
250 }
251
252 /* Returns the number of cases that will be read by successive
253 calls to casereader_read for READER, assuming that no errors
254 occur. Upon an error condition, the case count drops to 0, so
255 that no more cases can be obtained.
256
257 For a casereader that cannot predict the number of cases it
258 will produce, this function actually reads (and discards) all
259 of the contents of a clone of READER. Thus, the return value
260 is always correct in the absence of I/O errors. */
261 casenumber
casereader_count_cases(const struct casereader * reader)262 casereader_count_cases (const struct casereader *reader)
263 {
264 if (reader->case_cnt == CASENUMBER_MAX)
265 {
266 struct casereader *reader_rw = CONST_CAST (struct casereader *, reader);
267 reader_rw->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX);
268 }
269 return reader->case_cnt;
270 }
271
272 /* Truncates READER to at most N cases. */
273 void
casereader_truncate(struct casereader * reader,casenumber n)274 casereader_truncate (struct casereader *reader, casenumber n)
275 {
276 /* This could be optimized, if it ever becomes too expensive, by adding a
277 "max_cases" member to struct casereader. We could also add a "truncate"
278 function to the casereader implementation, to allow the casereader to
279 throw away data that cannot ever be read. */
280 if (reader->case_cnt == CASENUMBER_MAX)
281 reader->case_cnt = casereader_count_cases__ (reader, n);
282 if (reader->case_cnt > n)
283 reader->case_cnt = n;
284 }
285
286 /* Returns the prototype for the cases in READER. The caller
287 must not unref the returned prototype. */
288 const struct caseproto *
casereader_get_proto(const struct casereader * reader)289 casereader_get_proto (const struct casereader *reader)
290 {
291 return reader->proto;
292 }
293
294 /* Skips past N cases in READER, stopping when the last case in
295 READER has been read or on an input error. Returns the number
296 of cases successfully skipped. */
297 casenumber
casereader_advance(struct casereader * reader,casenumber n)298 casereader_advance (struct casereader *reader, casenumber n)
299 {
300 casenumber i;
301
302 for (i = 0; i < n; i++)
303 {
304 struct ccase *c = casereader_read (reader);
305 if (c == NULL)
306 break;
307 case_unref (c);
308 }
309
310 return i;
311 }
312
313
314 /* Copies all the cases in READER to WRITER, propagating errors
315 appropriately. READER is destroyed by this function */
316 void
casereader_transfer(struct casereader * reader,struct casewriter * writer)317 casereader_transfer (struct casereader *reader, struct casewriter *writer)
318 {
319 struct ccase *c;
320
321 taint_propagate (casereader_get_taint (reader),
322 casewriter_get_taint (writer));
323 while ((c = casereader_read (reader)) != NULL)
324 casewriter_write (writer, c);
325 casereader_destroy (reader);
326 }
327
328 /* Creates and returns a new casereader. This function is
329 intended for use by casereader implementations, not by
330 casereader clients.
331
332 This function is most suited for creating a casereader for a
333 data source that is naturally sequential.
334 casereader_create_random may be more appropriate for a data
335 source that supports random access.
336
337 Ordinarily, specify a null pointer for TAINT, in which case
338 the new casereader will have a new, unique taint object. If
339 the new casereader should have a clone of an existing taint
340 object, specify that object as TAINT. (This is most commonly
341 useful in an implementation of the "clone" casereader_class
342 function, in which case the cloned casereader should have the
343 same taint object as the original casereader.)
344
345 PROTO must be the prototype for the cases that may be read
346 from the casereader. The caller retains its reference to
347 PROTO.
348
349 CASE_CNT is an upper limit on the number of cases that
350 casereader_read will return from the casereader in successive
351 calls. Ordinarily, this is the actual number of cases in the
352 data source or CASENUMBER_MAX if the number of cases cannot be
353 predicted in advance.
354
355 CLASS and AUX are a set of casereader implementation-specific
356 member functions and auxiliary data to pass to those member
357 functions, respectively. */
358 struct casereader *
casereader_create_sequential(const struct taint * taint,const struct caseproto * proto,casenumber case_cnt,const struct casereader_class * class,void * aux)359 casereader_create_sequential (const struct taint *taint,
360 const struct caseproto *proto,
361 casenumber case_cnt,
362 const struct casereader_class *class, void *aux)
363 {
364 struct casereader *reader = xmalloc (sizeof *reader);
365 reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
366 reader->proto = caseproto_ref (proto);
367 reader->case_cnt = case_cnt;
368 reader->class = class;
369 reader->aux = aux;
370 return reader;
371 }
372
373 /* If READER is a casereader of the given CLASS, returns its
374 associated auxiliary data; otherwise, returns a null pointer.
375
376 This function is intended for use from casereader
377 implementations, not by casereader users. Even within
378 casereader implementations, its usefulness is quite limited,
379 for at least two reasons. First, every casereader member
380 function already receives a pointer to the casereader's
381 auxiliary data. Second, a casereader's class can change
382 (through a call to casereader_swap) and this is in practice
383 quite common (e.g. any call to casereader_clone on a
384 casereader that does not directly support clone will cause the
385 casereader to be replaced by a shim caseader). */
386 void *
casereader_dynamic_cast(struct casereader * reader,const struct casereader_class * class)387 casereader_dynamic_cast (struct casereader *reader,
388 const struct casereader_class *class)
389 {
390 return reader->class == class ? reader->aux : NULL;
391 }
392
393 /* Random-access casereader implementation.
394
395 This is a set of wrappers around casereader_create_sequential
396 and struct casereader_class to make it easy to create
397 efficient casereaders for data sources that natively support
398 random access. */
399
400 /* One clone of a random reader. */
401 struct random_reader
402 {
403 struct random_reader_shared *shared; /* Data shared among clones. */
404 struct heap_node heap_node; /* Node in shared data's heap of readers. */
405 casenumber offset; /* Number of cases already read. */
406 };
407
408 /* Returns the random_reader in which the given heap_node is
409 embedded. */
410 static struct random_reader *
random_reader_from_heap_node(const struct heap_node * node)411 random_reader_from_heap_node (const struct heap_node *node)
412 {
413 return heap_data (node, struct random_reader, heap_node);
414 }
415
416 /* Data shared among clones of a random reader. */
417 struct random_reader_shared
418 {
419 struct heap *readers; /* Heap of struct random_readers. */
420 casenumber min_offset; /* Smallest offset of any random_reader. */
421 const struct casereader_random_class *class;
422 void *aux;
423 };
424
425 static const struct casereader_class random_reader_casereader_class;
426
427 /* Creates and returns a new random_reader with the given SHARED
428 data and OFFSET. Inserts the new random reader into the
429 shared heap. */
430 static struct random_reader *
make_random_reader(struct random_reader_shared * shared,casenumber offset)431 make_random_reader (struct random_reader_shared *shared, casenumber offset)
432 {
433 struct random_reader *br = xmalloc (sizeof *br);
434 br->offset = offset;
435 br->shared = shared;
436 heap_insert (shared->readers, &br->heap_node);
437 return br;
438 }
439
440 /* Compares random_readers A and B by offset and returns a
441 strcmp()-like result. */
442 static int
compare_random_readers_by_offset(const struct heap_node * a_,const struct heap_node * b_,const void * aux UNUSED)443 compare_random_readers_by_offset (const struct heap_node *a_,
444 const struct heap_node *b_,
445 const void *aux UNUSED)
446 {
447 const struct random_reader *a = random_reader_from_heap_node (a_);
448 const struct random_reader *b = random_reader_from_heap_node (b_);
449 return a->offset < b->offset ? -1 : a->offset > b->offset;
450 }
451
452 /* Creates and returns a new casereader. This function is
453 intended for use by casereader implementations, not by
454 casereader clients.
455
456 This function is most suited for creating a casereader for a
457 data source that supports random access.
458 casereader_create_sequential is more appropriate for a data
459 source that is naturally sequential.
460
461 PROTO must be the prototype for the cases that may be read
462 from the casereader. The caller retains its reference to
463 PROTO.
464
465 CASE_CNT is an upper limit on the number of cases that
466 casereader_read will return from the casereader in successive
467 calls. Ordinarily, this is the actual number of cases in the
468 data source or CASENUMBER_MAX if the number of cases cannot be
469 predicted in advance.
470
471 CLASS and AUX are a set of casereader implementation-specific
472 member functions and auxiliary data to pass to those member
473 functions, respectively. */
474 struct casereader *
casereader_create_random(const struct caseproto * proto,casenumber case_cnt,const struct casereader_random_class * class,void * aux)475 casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
476 const struct casereader_random_class *class,
477 void *aux)
478 {
479 struct random_reader_shared *shared = xmalloc (sizeof *shared);
480 shared->readers = heap_create (compare_random_readers_by_offset, NULL);
481 shared->class = class;
482 shared->aux = aux;
483 shared->min_offset = 0;
484 return casereader_create_sequential (NULL, proto, case_cnt,
485 &random_reader_casereader_class,
486 make_random_reader (shared, 0));
487 }
488
489 /* Reassesses the min_offset in SHARED based on the minimum
490 offset in the heap. */
491 static void
advance_random_reader(struct casereader * reader,struct random_reader_shared * shared)492 advance_random_reader (struct casereader *reader,
493 struct random_reader_shared *shared)
494 {
495 casenumber old, new;
496
497 old = shared->min_offset;
498 new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
499 assert (new >= old);
500 if (new > old)
501 {
502 shared->min_offset = new;
503 shared->class->advance (reader, shared->aux, new - old);
504 }
505 }
506
507 /* struct casereader_class "read" function for random reader. */
508 static struct ccase *
random_reader_read(struct casereader * reader,void * br_)509 random_reader_read (struct casereader *reader, void *br_)
510 {
511 struct random_reader *br = br_;
512 struct random_reader_shared *shared = br->shared;
513 struct ccase *c = shared->class->read (reader, shared->aux,
514 br->offset - shared->min_offset);
515 if (c != NULL)
516 {
517 br->offset++;
518 heap_changed (shared->readers, &br->heap_node);
519 advance_random_reader (reader, shared);
520 }
521 return c;
522 }
523
524 /* struct casereader_class "destroy" function for random
525 reader. */
526 static void
random_reader_destroy(struct casereader * reader,void * br_)527 random_reader_destroy (struct casereader *reader, void *br_)
528 {
529 struct random_reader *br = br_;
530 struct random_reader_shared *shared = br->shared;
531
532 heap_delete (shared->readers, &br->heap_node);
533 if (heap_is_empty (shared->readers))
534 {
535 heap_destroy (shared->readers);
536 shared->class->destroy (reader, shared->aux);
537 free (shared);
538 }
539 else
540 advance_random_reader (reader, shared);
541
542 free (br);
543 }
544
545 /* struct casereader_class "clone" function for random reader. */
546 static struct casereader *
random_reader_clone(struct casereader * reader,void * br_)547 random_reader_clone (struct casereader *reader, void *br_)
548 {
549 struct random_reader *br = br_;
550 struct random_reader_shared *shared = br->shared;
551 return casereader_create_sequential (casereader_get_taint (reader),
552 reader->proto,
553 casereader_get_case_cnt (reader),
554 &random_reader_casereader_class,
555 make_random_reader (shared,
556 br->offset));
557 }
558
559 /* struct casereader_class "peek" function for random reader. */
560 static struct ccase *
random_reader_peek(struct casereader * reader,void * br_,casenumber idx)561 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
562 {
563 struct random_reader *br = br_;
564 struct random_reader_shared *shared = br->shared;
565
566 return shared->class->read (reader, shared->aux,
567 br->offset - shared->min_offset + idx);
568 }
569
570 /* Casereader class for random reader. */
571 static const struct casereader_class random_reader_casereader_class =
572 {
573 random_reader_read,
574 random_reader_destroy,
575 random_reader_clone,
576 random_reader_peek,
577 };
578
579
580 static const struct casereader_class casereader_null_class;
581
582 /* Returns a casereader with no cases. The casereader has the prototype
583 specified by PROTO. PROTO may be specified as a null pointer, in which case
584 the casereader has no variables. */
585 struct casereader *
casereader_create_empty(const struct caseproto * proto_)586 casereader_create_empty (const struct caseproto *proto_)
587 {
588 struct casereader *reader;
589 struct caseproto *proto;
590
591 proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
592 reader = casereader_create_sequential (NULL, proto, 0,
593 &casereader_null_class, NULL);
594 caseproto_unref (proto);
595
596 return reader;
597 }
598
599 static struct ccase *
casereader_null_read(struct casereader * reader UNUSED,void * aux UNUSED)600 casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
601 {
602 return NULL;
603 }
604
605 static void
casereader_null_destroy(struct casereader * reader UNUSED,void * aux UNUSED)606 casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
607 {
608 /* Nothing to do. */
609 }
610
611 static const struct casereader_class casereader_null_class =
612 {
613 casereader_null_read,
614 casereader_null_destroy,
615 NULL, /* clone */
616 NULL, /* peek */
617 };
618