1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2007, 2009, 2010, 2011, 2012, 2013 Free Software Foundation, Inc.
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/datasheet.h"
20
21 #include <stdlib.h>
22 #include <string.h>
23
24 #include "data/casereader-provider.h"
25 #include "data/casereader.h"
26 #include "data/casewriter.h"
27 #include "data/lazy-casereader.h"
28 #include "data/settings.h"
29 #include "libpspp/array.h"
30 #include "libpspp/assertion.h"
31 #include "libpspp/misc.h"
32 #include "libpspp/range-map.h"
33 #include "libpspp/range-set.h"
34 #include "libpspp/sparse-xarray.h"
35 #include "libpspp/taint.h"
36 #include "libpspp/tower.h"
37
38 #include "gl/minmax.h"
39 #include "gl/md4.h"
40 #include "gl/xalloc.h"
41
42 struct column;
43
44 static struct axis *axis_create (void);
45 static struct axis *axis_clone (const struct axis *);
46 static void axis_destroy (struct axis *);
47
48 static void axis_hash (const struct axis *, struct md4_ctx *);
49
50 static bool axis_allocate (struct axis *, unsigned long int request,
51 unsigned long int *start,
52 unsigned long int *width);
53 static void axis_make_available (struct axis *,
54 unsigned long int start,
55 unsigned long int width);
56 static unsigned long int axis_extend (struct axis *, unsigned long int width);
57
58 static unsigned long int axis_map (const struct axis *, unsigned long log_pos);
59
60 static unsigned long axis_get_size (const struct axis *);
61 static void axis_insert (struct axis *,
62 unsigned long int log_start,
63 unsigned long int phy_start,
64 unsigned long int cnt);
65 static void axis_remove (struct axis *,
66 unsigned long int start, unsigned long int cnt);
67 static void axis_move (struct axis *,
68 unsigned long int old_start,
69 unsigned long int new_start,
70 unsigned long int cnt);
71
72 static struct source *source_create_empty (size_t n_bytes);
73 static struct source *source_create_casereader (struct casereader *);
74 static struct source *source_clone (const struct source *);
75 static void source_destroy (struct source *);
76
77 static casenumber source_get_backing_n_rows (const struct source *);
78
79 static int source_allocate_column (struct source *, int width);
80 static void source_release_column (struct source *, int ofs, int width);
81 static bool source_in_use (const struct source *);
82
83 static bool source_read (const struct column *, casenumber row, union value *,
84 size_t n);
85 static bool source_write (const struct column *, casenumber row,
86 const union value *, size_t n);
87 static bool source_write_column (struct column *, const union value *);
88 static bool source_has_backing (const struct source *);
89
90 static int get_source_index (const struct datasheet *ds, const struct source *source);
91
92 /* A datasheet is internally composed from a set of data files,
93 called "sources". The sources that make up a datasheet must
94 have the same number of rows (cases), but their numbers of
95 columns (variables) may vary.
96
97 A datasheet's external view is produced by mapping (permuting
98 and selecting) its internal data. Thus, we can rearrange or
99 delete rows or columns simply by modifying the mapping. We
100 add rows by adding rows to each source and to the row mapping.
101 We add columns by adding a new source, then adding that source
102 to the column mapping.
103
104 Each source in a datasheet can be a casereader or a
105 sparse_xarray. Casereaders are read-only, so when sources
106 made from casereaders need to be modified, it is done
107 "virtually" through being overlaid by a sparse_xarray. */
108 struct source
109 {
110 struct range_set *avail; /* Free bytes are set to 1s. */
111 struct sparse_xarray *data; /* Data at top level, atop the backing. */
112 struct casereader *backing; /* Backing casereader (or null). */
113 casenumber backing_rows; /* Number of rows in backing (if backed). */
114 size_t n_used; /* Number of column in use (if backed). */
115 };
116
117 /* A logical column. */
118 struct column
119 {
120 struct source *source; /* Source of the underlying physical column. */
121 int value_ofs; /* If 'source' has a backing casereader,
122 column's value offset in its cases. */
123 int byte_ofs; /* Byte offset in source's sparse_xarray. */
124 int width; /* 0=numeric, otherwise string width. */
125 };
126
127 /* A datasheet. */
128 struct datasheet
129 {
130 /* Data sources. */
131 struct source **sources; /* Sources, in no particular order. */
132 size_t n_sources; /* Number of sources. */
133
134 /* Columns. */
135 struct caseproto *proto; /* Prototype for rows (initialized lazily). */
136 struct column *columns; /* Logical to physical column mapping. */
137 size_t n_columns; /* Number of logical columns. */
138 unsigned column_min_alloc; /* Min. # of columns to put in a new source. */
139
140 /* Rows. */
141 struct axis *rows; /* Logical to physical row mapping. */
142
143 /* Tainting. */
144 struct taint *taint; /* Indicates corrupted data. */
145 };
146
147 /* Is this operation a read or a write? */
148 enum rw_op
149 {
150 OP_READ,
151 OP_WRITE
152 };
153
154 static void allocate_column (struct datasheet *, int width, struct column *);
155 static void release_source (struct datasheet *, struct source *);
156 static bool rw_case (struct datasheet *ds, enum rw_op op,
157 casenumber lrow, size_t start_column, size_t n_columns,
158 union value data[]);
159
160 /* Returns the number of bytes needed to store a value with the
161 given WIDTH on disk. */
162 static size_t
width_to_n_bytes(int width)163 width_to_n_bytes (int width)
164 {
165 return width == 0 ? sizeof (double) : width;
166 }
167
168 /* Returns the address of the data in VALUE (for reading or
169 writing to/from disk). VALUE must have the given WIDTH. */
170 static void *
value_to_data(const union value * value_,int width)171 value_to_data (const union value *value_, int width)
172 {
173 union value *value = (union value *) value_;
174 assert (sizeof value->f == sizeof (double));
175 if (width == 0)
176 return &value->f;
177 else
178 return value->s;
179 }
180
181 /* Returns the number of bytes needed to store all the values in
182 PROTO on disk. */
183 static size_t
caseproto_to_n_bytes(const struct caseproto * proto)184 caseproto_to_n_bytes (const struct caseproto *proto)
185 {
186 size_t n_bytes;
187 size_t i;
188
189 n_bytes = 0;
190 for (i = 0; i < caseproto_get_n_widths (proto); i++)
191 {
192 int width = caseproto_get_width (proto, i);
193 if (width >= 0)
194 n_bytes += width_to_n_bytes (width);
195 }
196 return n_bytes;
197 }
198
199 /* Creates and returns a new datasheet.
200
201 If READER is nonnull, then the datasheet initially contains
202 the contents of READER. */
203 struct datasheet *
datasheet_create(struct casereader * reader)204 datasheet_create (struct casereader *reader)
205 {
206 struct datasheet *ds = xmalloc (sizeof *ds);
207 ds->sources = NULL;
208 ds->n_sources = 0;
209 ds->proto = NULL;
210 ds->columns = NULL;
211 ds->n_columns = 0;
212 ds->column_min_alloc = 8;
213 ds->rows = axis_create ();
214 ds->taint = taint_create ();
215
216 if (reader != NULL)
217 {
218 casenumber n_rows;
219 size_t byte_ofs;
220 size_t i;
221
222 taint_propagate (casereader_get_taint (reader), ds->taint);
223
224 ds->proto = caseproto_ref (casereader_get_proto (reader));
225
226 ds->sources = xmalloc (sizeof *ds->sources);
227 ds->sources[0] = source_create_casereader (reader);
228 ds->n_sources = 1;
229
230 ds->n_columns = caseproto_get_n_widths (ds->proto);
231 ds->columns = xnmalloc (ds->n_columns, sizeof *ds->columns);
232 byte_ofs = 0;
233 for (i = 0; i < ds->n_columns; i++)
234 {
235 struct column *column = &ds->columns[i];
236 int width = caseproto_get_width (ds->proto, i);
237 column->source = ds->sources[0];
238 column->width = width;
239 if (width >= 0)
240 {
241 column->value_ofs = i;
242 column->byte_ofs = byte_ofs;
243 byte_ofs += width_to_n_bytes (column->width);
244 }
245 }
246
247 n_rows = source_get_backing_n_rows (ds->sources[0]);
248 if (n_rows > 0)
249 axis_insert (ds->rows, 0, axis_extend (ds->rows, n_rows), n_rows);
250 }
251
252 return ds;
253 }
254
255 /* Destroys datasheet DS. */
256 void
datasheet_destroy(struct datasheet * ds)257 datasheet_destroy (struct datasheet *ds)
258 {
259 size_t i;
260
261 if (ds == NULL)
262 return;
263
264 for (i = 0; i < ds->n_sources; i++)
265 source_destroy (ds->sources[i]);
266 free (ds->sources);
267 caseproto_unref (ds->proto);
268 free (ds->columns);
269 axis_destroy (ds->rows);
270 taint_destroy (ds->taint);
271 free (ds);
272 }
273
274 /* Returns the prototype for the cases in DS. The caller must
275 not unref the returned prototype. */
276 const struct caseproto *
datasheet_get_proto(const struct datasheet * ds_)277 datasheet_get_proto (const struct datasheet *ds_)
278 {
279 struct datasheet *ds = CONST_CAST (struct datasheet *, ds_);
280 if (ds->proto == NULL)
281 {
282 size_t i;
283
284 ds->proto = caseproto_create ();
285 for (i = 0; i < ds->n_columns; i++)
286 ds->proto = caseproto_add_width (ds->proto, ds->columns[i].width);
287 }
288 return ds->proto;
289 }
290
291 /* Returns the width of the given COLUMN within DS.
292 COLUMN must be less than the number of columns in DS. */
293 int
datasheet_get_column_width(const struct datasheet * ds,size_t column)294 datasheet_get_column_width (const struct datasheet *ds, size_t column)
295 {
296 assert (column < datasheet_get_n_columns (ds));
297 return ds->columns[column].width;
298 }
299
300 /* Moves datasheet DS to a new location in memory, and returns
301 the new location. Afterward, the datasheet must not be
302 accessed at its former location.
303
304 This function is useful for ensuring that all references to a
305 datasheet have been dropped, especially in conjunction with
306 tools like Valgrind. */
307 struct datasheet *
datasheet_rename(struct datasheet * ds)308 datasheet_rename (struct datasheet *ds)
309 {
310 struct datasheet *new = xmemdup (ds, sizeof *ds);
311 free (ds);
312 return new;
313 }
314
315 /* Returns true if datasheet DS is tainted.
316 A datasheet is tainted by an I/O error or by taint
317 propagation to the datasheet. */
318 bool
datasheet_error(const struct datasheet * ds)319 datasheet_error (const struct datasheet *ds)
320 {
321 return taint_is_tainted (ds->taint);
322 }
323
324 /* Marks datasheet DS tainted. */
325 void
datasheet_force_error(struct datasheet * ds)326 datasheet_force_error (struct datasheet *ds)
327 {
328 taint_set_taint (ds->taint);
329 }
330
331 /* Returns datasheet DS's taint object. */
332 const struct taint *
datasheet_get_taint(const struct datasheet * ds)333 datasheet_get_taint (const struct datasheet *ds)
334 {
335 return ds->taint;
336 }
337
338 /* Returns the number of rows in DS. */
339 casenumber
datasheet_get_n_rows(const struct datasheet * ds)340 datasheet_get_n_rows (const struct datasheet *ds)
341 {
342 return axis_get_size (ds->rows);
343 }
344
345 /* Returns the number of columns in DS. */
346 size_t
datasheet_get_n_columns(const struct datasheet * ds)347 datasheet_get_n_columns (const struct datasheet *ds)
348 {
349 return ds->n_columns;
350 }
351
352 /* Inserts a column of the given WIDTH into datasheet DS just
353 before column BEFORE. Initializes the contents of each row in
354 the inserted column to VALUE (which must have width WIDTH).
355
356 Returns true if successful, false on failure. In case of
357 failure, the datasheet is unchanged. */
358 bool
datasheet_insert_column(struct datasheet * ds,const union value * value,int width,size_t before)359 datasheet_insert_column (struct datasheet *ds,
360 const union value *value, int width, size_t before)
361 {
362 struct column *col;
363
364 assert (before <= ds->n_columns);
365
366 ds->columns = xnrealloc (ds->columns,
367 ds->n_columns + 1, sizeof *ds->columns);
368 insert_element (ds->columns, ds->n_columns, sizeof *ds->columns, before);
369 col = &ds->columns[before];
370 ds->n_columns++;
371
372 allocate_column (ds, width, col);
373
374 if (width >= 0 && !source_write_column (col, value))
375 {
376 datasheet_delete_columns (ds, before, 1);
377 taint_set_taint (ds->taint);
378 return false;
379 }
380
381 return true;
382 }
383
384 /* Deletes the N columns in DS starting from column START. */
385 void
datasheet_delete_columns(struct datasheet * ds,size_t start,size_t n)386 datasheet_delete_columns (struct datasheet *ds, size_t start, size_t n)
387 {
388 assert (start + n <= ds->n_columns);
389
390 if (n > 0)
391 {
392 size_t i;
393
394 for (i = start; i < start + n; i++)
395 {
396 struct column *column = &ds->columns[i];
397 struct source *source = column->source;
398 source_release_column (source, column->byte_ofs, column->width);
399 release_source (ds, source);
400 }
401
402 remove_range (ds->columns, ds->n_columns, sizeof *ds->columns, start, n);
403 ds->n_columns -= n;
404
405 caseproto_unref (ds->proto);
406 ds->proto = NULL;
407 }
408 }
409
410 /* Moves the N columns in DS starting at position OLD_START so
411 that they then start at position NEW_START. Equivalent to
412 deleting the column rows, then inserting them at what becomes
413 position NEW_START after the deletion. */
414 void
datasheet_move_columns(struct datasheet * ds,size_t old_start,size_t new_start,size_t n)415 datasheet_move_columns (struct datasheet *ds,
416 size_t old_start, size_t new_start,
417 size_t n)
418 {
419 assert (old_start + n <= ds->n_columns);
420 assert (new_start + n <= ds->n_columns);
421
422 move_range (ds->columns, ds->n_columns, sizeof *ds->columns,
423 old_start, new_start, n);
424
425 caseproto_unref (ds->proto);
426 ds->proto = NULL;
427 }
428
429 struct resize_datasheet_value_aux
430 {
431 union value src_value;
432 size_t src_ofs;
433 int src_width;
434
435 void (*resize_cb) (const union value *, union value *, const void *aux);
436 const void *resize_cb_aux;
437
438 union value dst_value;
439 size_t dst_ofs;
440 int dst_width;
441 };
442
443 static bool
resize_datasheet_value(const void * src,void * dst,void * aux_)444 resize_datasheet_value (const void *src, void *dst, void *aux_)
445 {
446 struct resize_datasheet_value_aux *aux = aux_;
447
448 memcpy (value_to_data (&aux->src_value, aux->src_width),
449 (uint8_t *) src + aux->src_ofs,
450 width_to_n_bytes (aux->src_width));
451
452 aux->resize_cb (&aux->src_value, &aux->dst_value, aux->resize_cb_aux);
453
454 memcpy ((uint8_t *) dst + aux->dst_ofs,
455 value_to_data (&aux->dst_value, aux->dst_width),
456 width_to_n_bytes (aux->dst_width));
457
458 return true;
459 }
460
461 bool
datasheet_resize_column(struct datasheet * ds,size_t column,int new_width,void (* resize_cb)(const union value *,union value *,const void * aux),const void * resize_cb_aux)462 datasheet_resize_column (struct datasheet *ds, size_t column, int new_width,
463 void (*resize_cb) (const union value *,
464 union value *, const void *aux),
465 const void *resize_cb_aux)
466 {
467 struct column old_col;
468 struct column *col;
469 int old_width;
470
471 assert (column < datasheet_get_n_columns (ds));
472
473 col = &ds->columns[column];
474 old_col = *col;
475 old_width = old_col.width;
476
477 if (new_width == -1)
478 {
479 if (old_width != -1)
480 {
481 datasheet_delete_columns (ds, column, 1);
482 datasheet_insert_column (ds, NULL, -1, column);
483 }
484 }
485 else if (old_width == -1)
486 {
487 union value value;
488 value_init (&value, new_width);
489 value_set_missing (&value, new_width);
490 if (resize_cb != NULL)
491 resize_cb (NULL, &value, resize_cb_aux);
492 datasheet_delete_columns (ds, column, 1);
493 datasheet_insert_column (ds, &value, new_width, column);
494 value_destroy (&value, new_width);
495 }
496 else if (source_has_backing (col->source))
497 {
498 unsigned long int n_rows = axis_get_size (ds->rows);
499 unsigned long int lrow;
500 union value src, dst;
501
502 source_release_column (col->source, col->byte_ofs, col->width);
503 allocate_column (ds, new_width, col);
504
505 value_init (&src, old_width);
506 value_init (&dst, new_width);
507 for (lrow = 0; lrow < n_rows; lrow++)
508 {
509 unsigned long int prow = axis_map (ds->rows, lrow);
510 if (!source_read (&old_col, prow, &src, 1))
511 {
512 /* FIXME: back out col changes. */
513 break;
514 }
515 resize_cb (&src, &dst, resize_cb_aux);
516 if (!source_write (col, prow, &dst, 1))
517 {
518 /* FIXME: back out col changes. */
519 break;
520 }
521 }
522 value_destroy (&src, old_width);
523 value_destroy (&dst, new_width);
524 if (lrow < n_rows)
525 return false;
526
527 release_source (ds, old_col.source);
528 }
529 else
530 {
531 struct resize_datasheet_value_aux aux;
532
533 source_release_column (col->source, col->byte_ofs, col->width);
534 allocate_column (ds, new_width, col);
535
536 value_init (&aux.src_value, old_col.width);
537 aux.src_ofs = old_col.byte_ofs;
538 aux.src_width = old_col.width;
539 aux.resize_cb = resize_cb;
540 aux.resize_cb_aux = resize_cb_aux;
541 value_init (&aux.dst_value, new_width);
542 aux.dst_ofs = col->byte_ofs;
543 aux.dst_width = new_width;
544 sparse_xarray_copy (old_col.source->data, col->source->data,
545 resize_datasheet_value, &aux);
546 value_destroy (&aux.src_value, old_width);
547 value_destroy (&aux.dst_value, new_width);
548
549 release_source (ds, old_col.source);
550 }
551 return true;
552 }
553
554 /* Retrieves and returns the contents of the given ROW in
555 datasheet DS. The caller owns the returned case and must
556 unref it when it is no longer needed. Returns a null pointer
557 on I/O error. */
558 struct ccase *
datasheet_get_row(const struct datasheet * ds,casenumber row)559 datasheet_get_row (const struct datasheet *ds, casenumber row)
560 {
561 size_t n_columns = datasheet_get_n_columns (ds);
562 struct ccase *c = case_create (datasheet_get_proto (ds));
563 if (rw_case (CONST_CAST (struct datasheet *, ds), OP_READ,
564 row, 0, n_columns, case_data_all_rw (c)))
565 return c;
566 else
567 {
568 case_unref (c);
569 return NULL;
570 }
571 }
572
573 /* Stores the contents of case C, which is destroyed, into the
574 given ROW in DS. Returns true on success, false on I/O error.
575 On failure, the given ROW might be partially modified or
576 corrupted. */
577 bool
datasheet_put_row(struct datasheet * ds,casenumber row,struct ccase * c)578 datasheet_put_row (struct datasheet *ds, casenumber row, struct ccase *c)
579 {
580 size_t n_columns = datasheet_get_n_columns (ds);
581 bool ok = rw_case (ds, OP_WRITE, row, 0, n_columns,
582 (union value *) case_data_all (c));
583 case_unref (c);
584 return ok;
585 }
586
587 /* Stores the values of COLUMN in DS in the given ROW in DS into
588 VALUE. The caller must have already initialized VALUE as a
589 value of the appropriate width (as returned by
590 datasheet_get_column_width (DS, COLUMN)). Returns true if
591 successful, false on I/O error. */
592 bool
datasheet_get_value(const struct datasheet * ds,casenumber row,size_t column,union value * value)593 datasheet_get_value (const struct datasheet *ds, casenumber row,
594 size_t column, union value *value)
595 {
596 assert (row >= 0);
597 return rw_case (CONST_CAST (struct datasheet *, ds), OP_READ,
598 row, column, 1, value);
599 }
600
601 /* Stores VALUE into DS in the given ROW and COLUMN. VALUE must
602 have the correct width for COLUMN (as returned by
603 datasheet_get_column_width (DS, COLUMN)). Returns true if
604 successful, false on I/O error. On failure, ROW might be
605 partially modified or corrupted. */
606 bool
datasheet_put_value(struct datasheet * ds,casenumber row,size_t column,const union value * value)607 datasheet_put_value (struct datasheet *ds, casenumber row,
608 size_t column, const union value *value)
609 {
610 return rw_case (ds, OP_WRITE, row, column, 1, (union value *) value);
611 }
612
613 /* Inserts the CNT cases at C into datasheet DS just before row
614 BEFORE. Returns true if successful, false on I/O error. On
615 failure, datasheet DS is not modified.
616
617 Regardless of success, this function unrefs all of the cases
618 in C. */
619 bool
datasheet_insert_rows(struct datasheet * ds,casenumber before,struct ccase * c[],casenumber cnt)620 datasheet_insert_rows (struct datasheet *ds,
621 casenumber before, struct ccase *c[],
622 casenumber cnt)
623 {
624 casenumber added = 0;
625 while (cnt > 0)
626 {
627 unsigned long first_phy;
628 unsigned long phy_cnt;
629 unsigned long i;
630
631 /* Allocate physical rows from the pool of available
632 rows. */
633 if (!axis_allocate (ds->rows, cnt, &first_phy, &phy_cnt))
634 {
635 /* No rows were available. Extend the row axis to make
636 some new ones available. */
637 phy_cnt = cnt;
638 first_phy = axis_extend (ds->rows, cnt);
639 }
640
641 /* Insert the new rows into the row mapping. */
642 axis_insert (ds->rows, before, first_phy, phy_cnt);
643
644 /* Initialize the new rows. */
645 for (i = 0; i < phy_cnt; i++)
646 if (!datasheet_put_row (ds, before + i, c[i]))
647 {
648 while (++i < cnt)
649 case_unref (c[i]);
650 datasheet_delete_rows (ds, before - added, phy_cnt + added);
651 return false;
652 }
653
654 /* Advance. */
655 c += phy_cnt;
656 cnt -= phy_cnt;
657 before += phy_cnt;
658 added += phy_cnt;
659 }
660 return true;
661 }
662
663 /* Deletes the CNT rows in DS starting from row FIRST. */
664 void
datasheet_delete_rows(struct datasheet * ds,casenumber first,casenumber cnt)665 datasheet_delete_rows (struct datasheet *ds,
666 casenumber first, casenumber cnt)
667 {
668 size_t lrow;
669
670 /* Free up rows for reuse.
671 FIXME: optimize. */
672 for (lrow = first; lrow < first + cnt; lrow++)
673 axis_make_available (ds->rows, axis_map (ds->rows, lrow), 1);
674
675 /* Remove rows from logical-to-physical mapping. */
676 axis_remove (ds->rows, first, cnt);
677 }
678
679 /* Moves the CNT rows in DS starting at position OLD_START so
680 that they then start at position NEW_START. Equivalent to
681 deleting the given rows, then inserting them at what becomes
682 position NEW_START after the deletion. */
683 void
datasheet_move_rows(struct datasheet * ds,size_t old_start,size_t new_start,size_t cnt)684 datasheet_move_rows (struct datasheet *ds,
685 size_t old_start, size_t new_start,
686 size_t cnt)
687 {
688 axis_move (ds->rows, old_start, new_start, cnt);
689 }
690
691 static const struct casereader_random_class datasheet_reader_class;
692
693 /* Creates and returns a casereader whose input cases are the
694 rows in datasheet DS. From the caller's perspective, DS is
695 effectively destroyed by this operation, such that the caller
696 must not reference it again. */
697 struct casereader *
datasheet_make_reader(struct datasheet * ds)698 datasheet_make_reader (struct datasheet *ds)
699 {
700 struct casereader *reader;
701 ds = datasheet_rename (ds);
702 reader = casereader_create_random (datasheet_get_proto (ds),
703 datasheet_get_n_rows (ds),
704 &datasheet_reader_class, ds);
705 taint_propagate (datasheet_get_taint (ds), casereader_get_taint (reader));
706 return reader;
707 }
708
709 /* "read" function for the datasheet random casereader. */
710 static struct ccase *
datasheet_reader_read(struct casereader * reader UNUSED,void * ds_,casenumber case_idx)711 datasheet_reader_read (struct casereader *reader UNUSED, void *ds_,
712 casenumber case_idx)
713 {
714 struct datasheet *ds = ds_;
715 if (case_idx < datasheet_get_n_rows (ds))
716 {
717 struct ccase *c = datasheet_get_row (ds, case_idx);
718 if (c == NULL)
719 taint_set_taint (ds->taint);
720 return c;
721 }
722 else
723 return NULL;
724 }
725
726 /* "destroy" function for the datasheet random casereader. */
727 static void
datasheet_reader_destroy(struct casereader * reader UNUSED,void * ds_)728 datasheet_reader_destroy (struct casereader *reader UNUSED, void *ds_)
729 {
730 struct datasheet *ds = ds_;
731 datasheet_destroy (ds);
732 }
733
734 /* "advance" function for the datasheet random casereader. */
735 static void
datasheet_reader_advance(struct casereader * reader UNUSED,void * ds_,casenumber case_cnt)736 datasheet_reader_advance (struct casereader *reader UNUSED, void *ds_,
737 casenumber case_cnt)
738 {
739 struct datasheet *ds = ds_;
740 datasheet_delete_rows (ds, 0, case_cnt);
741 }
742
743 /* Random casereader class for a datasheet. */
744 static const struct casereader_random_class datasheet_reader_class =
745 {
746 datasheet_reader_read,
747 datasheet_reader_destroy,
748 datasheet_reader_advance,
749 };
750
751 static void
allocate_column(struct datasheet * ds,int width,struct column * column)752 allocate_column (struct datasheet *ds, int width, struct column *column)
753 {
754 caseproto_unref (ds->proto);
755 ds->proto = NULL;
756
757 column->value_ofs = -1;
758 column->width = width;
759 if (width >= 0)
760 {
761 int n_bytes;
762 size_t i;
763
764 n_bytes = width_to_n_bytes (width);
765 for (i = 0; i < ds->n_sources; i++)
766 {
767 column->source = ds->sources[i];
768 column->byte_ofs = source_allocate_column (column->source, n_bytes);
769 if (column->byte_ofs >= 0)
770 return;
771 }
772
773 column->source = source_create_empty (MAX (n_bytes,
774 ds->column_min_alloc));
775 ds->sources = xnrealloc (ds->sources,
776 ds->n_sources + 1, sizeof *ds->sources);
777 ds->sources[ds->n_sources++] = column->source;
778
779 ds->column_min_alloc = MIN (65536, ds->column_min_alloc * 2);
780
781 column->byte_ofs = source_allocate_column (column->source, n_bytes);
782 assert (column->byte_ofs >= 0);
783 }
784 else
785 {
786 column->source = NULL;
787 column->byte_ofs = -1;
788 }
789 }
790
791 static void
release_source(struct datasheet * ds,struct source * source)792 release_source (struct datasheet *ds, struct source *source)
793 {
794 if (source_has_backing (source) && !source_in_use (source))
795 {
796 /* Since only the first source to be added ever
797 has a backing, this source must have index
798 0. */
799 assert (source == ds->sources[0]);
800 ds->sources[0] = ds->sources[--ds->n_sources];
801 source_destroy (source);
802 }
803 }
804
805 /* Reads (if OP is OP_READ) or writes (if op is OP_WRITE) the
806 N_COLUMNS columns starting from column START_COLUMN in row
807 LROW to/from the N_COLUMNS values in DATA. */
808 static bool
rw_case(struct datasheet * ds,enum rw_op op,casenumber lrow,size_t start_column,size_t n_columns,union value data[])809 rw_case (struct datasheet *ds, enum rw_op op,
810 casenumber lrow, size_t start_column, size_t n_columns,
811 union value data[])
812 {
813 struct column *columns = &ds->columns[start_column];
814 casenumber prow;
815 size_t i;
816
817 assert (lrow < datasheet_get_n_rows (ds));
818 assert (n_columns <= datasheet_get_n_columns (ds));
819 assert (start_column + n_columns <= datasheet_get_n_columns (ds));
820
821 prow = axis_map (ds->rows, lrow);
822 for (i = 0; i < n_columns;)
823 {
824 struct source *source = columns[i].source;
825 size_t j;
826 bool ok;
827
828 if (columns[i].width < 0)
829 {
830 i++;
831 continue;
832 }
833
834 for (j = i + 1; j < n_columns; j++)
835 if (columns[j].width < 0 || columns[j].source != source)
836 break;
837
838 if (op == OP_READ)
839 ok = source_read (&columns[i], prow, &data[i], j - i);
840 else
841 ok = source_write (&columns[i], prow, &data[i], j - i);
842
843 if (!ok)
844 {
845 taint_set_taint (ds->taint);
846 return false;
847 }
848
849 i = j;
850 }
851 return true;
852 }
853
854 /* An axis.
855
856 An axis has two functions. First, it maintains a mapping from
857 logical (client-visible) to physical (storage) ordinates. The
858 axis_map and axis_get_size functions inspect this mapping, and
859 the axis_insert, axis_remove, and axis_move functions modify
860 it. Second, it tracks the set of ordinates that are unused
861 and available for reuse. The axis_allocate,
862 axis_make_available, and axis_extend functions affect the set
863 of available ordinates. */
864 struct axis
865 {
866 struct tower log_to_phy; /* Map from logical to physical ordinates;
867 contains "struct axis_group"s. */
868 struct range_set *available; /* Set of unused, available ordinates. */
869 unsigned long int phy_size; /* Current physical length of axis. */
870 };
871
872 /* A mapping from logical to physical ordinates. */
873 struct axis_group
874 {
875 struct tower_node logical; /* Range of logical ordinates. */
876 unsigned long phy_start; /* First corresponding physical ordinate. */
877 };
878
879 static struct axis_group *axis_group_from_tower_node (struct tower_node *);
880 static struct tower_node *make_axis_group (unsigned long int phy_start);
881 static struct tower_node *split_axis (struct axis *, unsigned long int where);
882 static void merge_axis_nodes (struct axis *, struct tower_node *,
883 struct tower_node **other_node);
884 static void check_axis_merged (const struct axis *axis UNUSED);
885
886 /* Creates and returns a new, initially empty axis. */
887 static struct axis *
axis_create(void)888 axis_create (void)
889 {
890 struct axis *axis = xmalloc (sizeof *axis);
891 tower_init (&axis->log_to_phy);
892 axis->available = range_set_create ();
893 axis->phy_size = 0;
894 return axis;
895 }
896
897 /* Returns a clone of existing axis OLD.
898
899 Currently this is used only by the datasheet model checker
900 driver, but it could be otherwise useful. */
901 static struct axis *
axis_clone(const struct axis * old)902 axis_clone (const struct axis *old)
903 {
904 const struct tower_node *node;
905 struct axis *new;
906
907 new = xmalloc (sizeof *new);
908 tower_init (&new->log_to_phy);
909 new->available = range_set_clone (old->available, NULL);
910 new->phy_size = old->phy_size;
911
912 for (node = tower_first (&old->log_to_phy); node != NULL;
913 node = tower_next (&old->log_to_phy, node))
914 {
915 unsigned long int size = tower_node_get_size (node);
916 struct axis_group *group = tower_data (node, struct axis_group, logical);
917 tower_insert (&new->log_to_phy, size, make_axis_group (group->phy_start),
918 NULL);
919 }
920
921 return new;
922 }
923
924 /* Adds the state of AXIS to the MD4 hash context CTX.
925
926 This is only used by the datasheet model checker driver. It
927 is unlikely to be otherwise useful. */
928 static void
axis_hash(const struct axis * axis,struct md4_ctx * ctx)929 axis_hash (const struct axis *axis, struct md4_ctx *ctx)
930 {
931 const struct tower_node *tn;
932 const struct range_set_node *rsn;
933
934 for (tn = tower_first (&axis->log_to_phy); tn != NULL;
935 tn = tower_next (&axis->log_to_phy, tn))
936 {
937 struct axis_group *group = tower_data (tn, struct axis_group, logical);
938 unsigned long int phy_start = group->phy_start;
939 unsigned long int size = tower_node_get_size (tn);
940
941 md4_process_bytes (&phy_start, sizeof phy_start, ctx);
942 md4_process_bytes (&size, sizeof size, ctx);
943 }
944
945 RANGE_SET_FOR_EACH (rsn, axis->available)
946 {
947 unsigned long int start = range_set_node_get_start (rsn);
948 unsigned long int end = range_set_node_get_end (rsn);
949
950 md4_process_bytes (&start, sizeof start, ctx);
951 md4_process_bytes (&end, sizeof end, ctx);
952 }
953
954 md4_process_bytes (&axis->phy_size, sizeof axis->phy_size, ctx);
955 }
956
957 /* Destroys AXIS. */
958 static void
axis_destroy(struct axis * axis)959 axis_destroy (struct axis *axis)
960 {
961 if (axis == NULL)
962 return;
963
964 while (!tower_is_empty (&axis->log_to_phy))
965 {
966 struct tower_node *node = tower_first (&axis->log_to_phy);
967 struct axis_group *group = tower_data (node, struct axis_group,
968 logical);
969 tower_delete (&axis->log_to_phy, node);
970 free (group);
971 }
972
973 range_set_destroy (axis->available);
974 free (axis);
975 }
976
977 /* Allocates up to REQUEST contiguous unused and available
978 ordinates from AXIS. If successful, stores the number
979 obtained into *WIDTH and the ordinate of the first into
980 *START, marks the ordinates as now unavailable return true.
981 On failure, which occurs only if AXIS has no unused and
982 available ordinates, returns false without modifying AXIS. */
983 static bool
axis_allocate(struct axis * axis,unsigned long int request,unsigned long int * start,unsigned long int * width)984 axis_allocate (struct axis *axis, unsigned long int request,
985 unsigned long int *start, unsigned long int *width)
986 {
987 return range_set_allocate (axis->available, request, start, width);
988 }
989
990 /* Marks the WIDTH contiguous ordinates in AXIS, starting from
991 START, as unused and available. */
992 static void
axis_make_available(struct axis * axis,unsigned long int start,unsigned long int width)993 axis_make_available (struct axis *axis,
994 unsigned long int start, unsigned long int width)
995 {
996 range_set_set1 (axis->available, start, width);
997 }
998
999 /* Extends the total physical length of AXIS by WIDTH and returns
1000 the first ordinate in the new physical region. */
1001 static unsigned long int
axis_extend(struct axis * axis,unsigned long int width)1002 axis_extend (struct axis *axis, unsigned long int width)
1003 {
1004 unsigned long int start = axis->phy_size;
1005 axis->phy_size += width;
1006 return start;
1007 }
1008
1009 /* Returns the physical ordinate in AXIS corresponding to logical
1010 ordinate LOG_POS. LOG_POS must be less than the logical
1011 length of AXIS. */
1012 static unsigned long int
axis_map(const struct axis * axis,unsigned long log_pos)1013 axis_map (const struct axis *axis, unsigned long log_pos)
1014 {
1015 struct tower_node *node;
1016 struct axis_group *group;
1017 unsigned long int group_start;
1018
1019 node = tower_lookup (&axis->log_to_phy, log_pos, &group_start);
1020 group = tower_data (node, struct axis_group, logical);
1021 return group->phy_start + (log_pos - group_start);
1022 }
1023
1024 /* Returns the logical length of AXIS. */
1025 static unsigned long
axis_get_size(const struct axis * axis)1026 axis_get_size (const struct axis *axis)
1027 {
1028 return tower_height (&axis->log_to_phy);
1029 }
1030
1031 /* Inserts the CNT contiguous physical ordinates starting at
1032 PHY_START into AXIS's logical-to-physical mapping, starting at
1033 logical position LOG_START. */
1034 static void
axis_insert(struct axis * axis,unsigned long int log_start,unsigned long int phy_start,unsigned long int cnt)1035 axis_insert (struct axis *axis,
1036 unsigned long int log_start, unsigned long int phy_start,
1037 unsigned long int cnt)
1038 {
1039 struct tower_node *before = split_axis (axis, log_start);
1040 struct tower_node *new = make_axis_group (phy_start);
1041 tower_insert (&axis->log_to_phy, cnt, new, before);
1042 merge_axis_nodes (axis, new, NULL);
1043 check_axis_merged (axis);
1044 }
1045
1046 /* Removes CNT ordinates from AXIS's logical-to-physical mapping
1047 starting at logical position START. */
1048 static void
axis_remove(struct axis * axis,unsigned long int start,unsigned long int cnt)1049 axis_remove (struct axis *axis,
1050 unsigned long int start, unsigned long int cnt)
1051 {
1052 if (cnt > 0)
1053 {
1054 struct tower_node *last = split_axis (axis, start + cnt);
1055 struct tower_node *cur, *next;
1056 for (cur = split_axis (axis, start); cur != last; cur = next)
1057 {
1058 next = tower_delete (&axis->log_to_phy, cur);
1059 free (axis_group_from_tower_node (cur));
1060 }
1061 merge_axis_nodes (axis, last, NULL);
1062 check_axis_merged (axis);
1063 }
1064 }
1065
1066 /* Moves the CNT ordinates in AXIS's logical-to-mapping starting
1067 at logical position OLD_START so that they then start at
1068 position NEW_START. */
1069 static void
axis_move(struct axis * axis,unsigned long int old_start,unsigned long int new_start,unsigned long int cnt)1070 axis_move (struct axis *axis,
1071 unsigned long int old_start, unsigned long int new_start,
1072 unsigned long int cnt)
1073 {
1074 if (cnt > 0 && old_start != new_start)
1075 {
1076 struct tower_node *old_first, *old_last, *new_first;
1077 struct tower_node *merge1, *merge2;
1078 struct tower tmp_array;
1079
1080 /* Move ordinates OLD_START...(OLD_START + CNT) into new,
1081 separate TMP_ARRAY. */
1082 old_first = split_axis (axis, old_start);
1083 old_last = split_axis (axis, old_start + cnt);
1084 tower_init (&tmp_array);
1085 tower_splice (&tmp_array, NULL,
1086 &axis->log_to_phy, old_first, old_last);
1087 merge_axis_nodes (axis, old_last, NULL);
1088 check_axis_merged (axis);
1089
1090 /* Move TMP_ARRAY to position NEW_START. */
1091 new_first = split_axis (axis, new_start);
1092 merge1 = tower_first (&tmp_array);
1093 merge2 = tower_last (&tmp_array);
1094 if (merge2 == merge1)
1095 merge2 = NULL;
1096 tower_splice (&axis->log_to_phy, new_first, &tmp_array, old_first, NULL);
1097 merge_axis_nodes (axis, merge1, &merge2);
1098 merge_axis_nodes (axis, merge2, NULL);
1099 check_axis_merged (axis);
1100 }
1101 }
1102
1103 /* Returns the axis_group in which NODE is embedded. */
1104 static struct axis_group *
axis_group_from_tower_node(struct tower_node * node)1105 axis_group_from_tower_node (struct tower_node *node)
1106 {
1107 return tower_data (node, struct axis_group, logical);
1108 }
1109
1110 /* Creates and returns a new axis_group at physical position
1111 PHY_START. */
1112 static struct tower_node *
make_axis_group(unsigned long phy_start)1113 make_axis_group (unsigned long phy_start)
1114 {
1115 struct axis_group *group = xmalloc (sizeof *group);
1116 group->phy_start = phy_start;
1117 return &group->logical;
1118 }
1119
1120 /* Returns the tower_node in AXIS's logical-to-physical map whose
1121 bottom edge is at exact level WHERE. If there is no such
1122 tower_node in AXIS's logical-to-physical map, then split_axis
1123 creates one by breaking an existing tower_node into two
1124 separate ones, unless WHERE is equal to the tower height, in
1125 which case it simply returns a null pointer. */
1126 static struct tower_node *
split_axis(struct axis * axis,unsigned long int where)1127 split_axis (struct axis *axis, unsigned long int where)
1128 {
1129 unsigned long int group_start;
1130 struct tower_node *group_node;
1131 struct axis_group *group;
1132
1133 assert (where <= tower_height (&axis->log_to_phy));
1134 if (where >= tower_height (&axis->log_to_phy))
1135 return NULL;
1136
1137 group_node = tower_lookup (&axis->log_to_phy, where, &group_start);
1138 group = axis_group_from_tower_node (group_node);
1139 if (where > group_start)
1140 {
1141 unsigned long int size_1 = where - group_start;
1142 unsigned long int size_2 = tower_node_get_size (group_node) - size_1;
1143 struct tower_node *next = tower_next (&axis->log_to_phy, group_node);
1144 struct tower_node *new = make_axis_group (group->phy_start + size_1);
1145 tower_resize (&axis->log_to_phy, group_node, size_1);
1146 tower_insert (&axis->log_to_phy, size_2, new, next);
1147 return new;
1148 }
1149 else
1150 return &group->logical;
1151 }
1152
1153 /* Within AXIS, attempts to merge NODE (or the last node in AXIS,
1154 if NODE is null) with its neighbor nodes. This is possible
1155 when logically adjacent nodes are also adjacent physically (in
1156 the same order).
1157
1158 When a merge occurs, and OTHER_NODE is non-null and points to
1159 the node to be deleted, this function also updates
1160 *OTHER_NODE, if necessary, to ensure that it remains a valid
1161 pointer. */
1162 static void
merge_axis_nodes(struct axis * axis,struct tower_node * node,struct tower_node ** other_node)1163 merge_axis_nodes (struct axis *axis, struct tower_node *node,
1164 struct tower_node **other_node)
1165 {
1166 struct tower *t = &axis->log_to_phy;
1167 struct axis_group *group;
1168 struct tower_node *next, *prev;
1169
1170 /* Find node to potentially merge with neighbors. */
1171 if (node == NULL)
1172 node = tower_last (t);
1173 if (node == NULL)
1174 return;
1175 group = axis_group_from_tower_node (node);
1176
1177 /* Try to merge NODE with successor. */
1178 next = tower_next (t, node);
1179 if (next != NULL)
1180 {
1181 struct axis_group *next_group = axis_group_from_tower_node (next);
1182 unsigned long this_height = tower_node_get_size (node);
1183
1184 if (group->phy_start + this_height == next_group->phy_start)
1185 {
1186 unsigned long next_height = tower_node_get_size (next);
1187 tower_resize (t, node, this_height + next_height);
1188 if (other_node != NULL && *other_node == next)
1189 *other_node = tower_next (t, *other_node);
1190 tower_delete (t, next);
1191 free (next_group);
1192 }
1193 }
1194
1195 /* Try to merge NODE with predecessor. */
1196 prev = tower_prev (t, node);
1197 if (prev != NULL)
1198 {
1199 struct axis_group *prev_group = axis_group_from_tower_node (prev);
1200 unsigned long prev_height = tower_node_get_size (prev);
1201
1202 if (prev_group->phy_start + prev_height == group->phy_start)
1203 {
1204 unsigned long this_height = tower_node_get_size (node);
1205 group->phy_start = prev_group->phy_start;
1206 tower_resize (t, node, this_height + prev_height);
1207 if (other_node != NULL && *other_node == prev)
1208 *other_node = tower_next (t, *other_node);
1209 tower_delete (t, prev);
1210 free (prev_group);
1211 }
1212 }
1213 }
1214
1215 /* Verify that all potentially merge-able nodes in AXIS are
1216 actually merged. */
1217 static void
check_axis_merged(const struct axis * axis UNUSED)1218 check_axis_merged (const struct axis *axis UNUSED)
1219 {
1220 #if ASSERT_LEVEL >= 10
1221 struct tower_node *prev, *node;
1222
1223 for (prev = NULL, node = tower_first (&axis->log_to_phy); node != NULL;
1224 prev = node, node = tower_next (&axis->log_to_phy, node))
1225 if (prev != NULL)
1226 {
1227 struct axis_group *prev_group = axis_group_from_tower_node (prev);
1228 unsigned long prev_height = tower_node_get_size (prev);
1229 struct axis_group *node_group = axis_group_from_tower_node (node);
1230 assert (prev_group->phy_start + prev_height != node_group->phy_start);
1231 }
1232 #endif
1233 }
1234
1235 /* A source. */
1236
1237 /* Creates and returns an empty, unbacked source with N_BYTES
1238 bytes per case, none of which are initially in use. */
1239 static struct source *
source_create_empty(size_t n_bytes)1240 source_create_empty (size_t n_bytes)
1241 {
1242 struct source *source = xmalloc (sizeof *source);
1243 size_t row_size = n_bytes + 4 * sizeof (void *);
1244 size_t max_memory_rows = settings_get_workspace () / row_size;
1245 source->avail = range_set_create ();
1246 range_set_set1 (source->avail, 0, n_bytes);
1247 source->data = sparse_xarray_create (n_bytes, MAX (max_memory_rows, 4));
1248 source->backing = NULL;
1249 source->backing_rows = 0;
1250 source->n_used = 0;
1251 return source;
1252 }
1253
1254 /* Creates and returns a new source backed by READER and with the
1255 same initial dimensions and content. */
1256 static struct source *
source_create_casereader(struct casereader * reader)1257 source_create_casereader (struct casereader *reader)
1258 {
1259 const struct caseproto *proto = casereader_get_proto (reader);
1260 size_t n_bytes = caseproto_to_n_bytes (proto);
1261 struct source *source = source_create_empty (n_bytes);
1262 size_t n_columns;
1263 size_t i;
1264
1265 range_set_set0 (source->avail, 0, n_bytes);
1266 source->backing = reader;
1267 source->backing_rows = casereader_count_cases (reader);
1268
1269 source->n_used = 0;
1270 n_columns = caseproto_get_n_widths (proto);
1271 for (i = 0; i < n_columns; i++)
1272 if (caseproto_get_width (proto, i) >= 0)
1273 source->n_used++;
1274
1275 return source;
1276 }
1277
1278 /* Returns a clone of source OLD with the same data and backing
1279 (if any).
1280
1281 Currently this is used only by the datasheet model checker
1282 driver, but it could be otherwise useful. */
1283 static struct source *
source_clone(const struct source * old)1284 source_clone (const struct source *old)
1285 {
1286 struct source *new = xmalloc (sizeof *new);
1287 new->avail = range_set_clone (old->avail, NULL);
1288 new->data = sparse_xarray_clone (old->data);
1289 new->backing = old->backing != NULL ? casereader_clone (old->backing) : NULL;
1290 new->backing_rows = old->backing_rows;
1291 new->n_used = old->n_used;
1292 if (new->data == NULL)
1293 {
1294 source_destroy (new);
1295 new = NULL;
1296 }
1297 return new;
1298 }
1299
1300 static int
source_allocate_column(struct source * source,int width)1301 source_allocate_column (struct source *source, int width)
1302 {
1303 unsigned long int start;
1304 int n_bytes;
1305
1306 assert (width >= 0);
1307 n_bytes = width_to_n_bytes (width);
1308 if (source->backing == NULL
1309 && range_set_allocate_fully (source->avail, n_bytes, &start))
1310 return start;
1311 else
1312 return -1;
1313 }
1314
1315 static void
source_release_column(struct source * source,int ofs,int width)1316 source_release_column (struct source *source, int ofs, int width)
1317 {
1318 assert (width >= 0);
1319 range_set_set1 (source->avail, ofs, width_to_n_bytes (width));
1320 if (source->backing != NULL)
1321 source->n_used--;
1322 }
1323
1324 /* Returns true if SOURCE has any columns in use,
1325 false otherwise. */
1326 static bool
source_in_use(const struct source * source)1327 source_in_use (const struct source *source)
1328 {
1329 return source->n_used > 0;
1330 }
1331
1332 /* Destroys SOURCE and its data and backing, if any. */
1333 static void
source_destroy(struct source * source)1334 source_destroy (struct source *source)
1335 {
1336 if (source != NULL)
1337 {
1338 range_set_destroy (source->avail);
1339 sparse_xarray_destroy (source->data);
1340 casereader_destroy (source->backing);
1341 free (source);
1342 }
1343 }
1344
1345 /* Returns the number of rows in SOURCE's backing casereader
1346 (SOURCE must have a backing casereader). */
1347 static casenumber
source_get_backing_n_rows(const struct source * source)1348 source_get_backing_n_rows (const struct source *source)
1349 {
1350 assert (source_has_backing (source));
1351 return source->backing_rows;
1352 }
1353
1354 /* Reads the N COLUMNS in the given ROW, into the N VALUES. Returns true if
1355 successful, false on I/O error.
1356
1357 All of the COLUMNS must have the same source.
1358
1359 The caller must have initialized VALUES with the proper width. */
1360 static bool
source_read(const struct column columns[],casenumber row,union value values[],size_t n)1361 source_read (const struct column columns[], casenumber row,
1362 union value values[], size_t n)
1363 {
1364 struct source *source = columns[0].source;
1365 size_t i;
1366
1367 if (source->backing == NULL
1368 || sparse_xarray_contains_row (source->data, row))
1369 {
1370 bool ok = true;
1371
1372 for (i = 0; i < n && ok; i++)
1373 ok = sparse_xarray_read (source->data, row, columns[i].byte_ofs,
1374 width_to_n_bytes (columns[i].width),
1375 value_to_data (&values[i], columns[i].width));
1376 return ok;
1377 }
1378 else
1379 {
1380 struct ccase *c = casereader_peek (source->backing, row);
1381 bool ok = c != NULL;
1382 if (ok)
1383 {
1384 for (i = 0; i < n; i++)
1385 value_copy (&values[i], case_data_idx (c, columns[i].value_ofs),
1386 columns[i].width);
1387 case_unref (c);
1388 }
1389 return ok;
1390 }
1391 }
1392
1393 static bool
copy_case_into_source(struct source * source,struct ccase * c,casenumber row)1394 copy_case_into_source (struct source *source, struct ccase *c, casenumber row)
1395 {
1396 const struct caseproto *proto = casereader_get_proto (source->backing);
1397 size_t n_widths = caseproto_get_n_widths (proto);
1398 size_t ofs;
1399 size_t i;
1400
1401 ofs = 0;
1402 for (i = 0; i < n_widths; i++)
1403 {
1404 int width = caseproto_get_width (proto, i);
1405 if (width >= 0)
1406 {
1407 int n_bytes = width_to_n_bytes (width);
1408 if (!sparse_xarray_write (source->data, row, ofs, n_bytes,
1409 value_to_data (case_data_idx (c, i),
1410 width)))
1411 return false;
1412 ofs += n_bytes;
1413 }
1414 }
1415 return true;
1416 }
1417
1418 /* Writes the N VALUES to their source in the given ROW and COLUMNS. Returns
1419 true if successful, false on I/O error. On error, the row's data may be
1420 completely or partially corrupted, both inside and outside the region to be
1421 written.
1422
1423 All of the COLUMNS must have the same source. */
1424 static bool
source_write(const struct column columns[],casenumber row,const union value values[],size_t n)1425 source_write (const struct column columns[], casenumber row,
1426 const union value values[], size_t n)
1427 {
1428 struct source *source = columns[0].source;
1429 struct casereader *backing = source->backing;
1430 size_t i;
1431
1432 if (backing != NULL
1433 && !sparse_xarray_contains_row (source->data, row)
1434 && row < source->backing_rows)
1435 {
1436 struct ccase *c;
1437 bool ok;
1438
1439 c = casereader_peek (backing, row);
1440 if (c == NULL)
1441 return false;
1442
1443 ok = copy_case_into_source (source, c, row);
1444 case_unref (c);
1445 if (!ok)
1446 return false;
1447 }
1448
1449 for (i = 0; i < n; i++)
1450 if (!sparse_xarray_write (source->data, row, columns[i].byte_ofs,
1451 width_to_n_bytes (columns[i].width),
1452 value_to_data (&values[i], columns[i].width)))
1453 return false;
1454 return true;
1455 }
1456
1457 /* Within SOURCE, which must not have a backing casereader,
1458 writes the VALUE_CNT values in VALUES_CNT to the VALUE_CNT
1459 columns starting from START_COLUMN, in every row, even in rows
1460 not yet otherwise initialized. Returns true if successful,
1461 false if an I/O error occurs.
1462
1463 We don't support backing != NULL because (1) it's harder and
1464 (2) this function is only called by
1465 datasheet_insert_column, which doesn't reuse columns from
1466 sources that are backed by casereaders. */
1467 static bool
source_write_column(struct column * column,const union value * value)1468 source_write_column (struct column *column, const union value *value)
1469 {
1470 int width = column->width;
1471
1472 assert (column->source->backing == NULL);
1473 assert (width >= 0);
1474
1475 return sparse_xarray_write_columns (column->source->data, column->byte_ofs,
1476 width_to_n_bytes (width),
1477 value_to_data (value, width));
1478 }
1479
1480 /* Returns true if SOURCE has a backing casereader, false
1481 otherwise. */
1482 static bool
source_has_backing(const struct source * source)1483 source_has_backing (const struct source *source)
1484 {
1485 return source->backing != NULL;
1486 }
1487
1488 /* Datasheet model checker test driver. */
1489
1490 static int
get_source_index(const struct datasheet * ds,const struct source * source)1491 get_source_index (const struct datasheet *ds, const struct source *source)
1492 {
1493 size_t i;
1494
1495 for (i = 0; i < ds->n_sources; i++)
1496 if (ds->sources[i] == source)
1497 return i;
1498 NOT_REACHED ();
1499 }
1500
1501 /* Clones the structure and contents of ODS into a new datasheet,
1502 and returns the new datasheet. */
1503 struct datasheet *
clone_datasheet(const struct datasheet * ods)1504 clone_datasheet (const struct datasheet *ods)
1505 {
1506 struct datasheet *ds;
1507 size_t i;
1508
1509 ds = xmalloc (sizeof *ds);
1510
1511 ds->sources = xmalloc (ods->n_sources * sizeof *ds->sources);
1512 for (i = 0; i < ods->n_sources; i++)
1513 ds->sources[i] = source_clone (ods->sources[i]);
1514 ds->n_sources = ods->n_sources;
1515
1516 ds->proto = ods->proto != NULL ? caseproto_ref (ods->proto) : NULL;
1517 ds->columns = xmemdup (ods->columns, ods->n_columns * sizeof *ods->columns);
1518 for (i = 0; i < ods->n_columns; i++)
1519 ds->columns[i].source
1520 = ds->sources[get_source_index (ods, ods->columns[i].source)];
1521 ds->n_columns = ods->n_columns;
1522 ds->column_min_alloc = ods->column_min_alloc;
1523
1524 ds->rows = axis_clone (ods->rows);
1525
1526 ds->taint = taint_create ();
1527
1528 return ds;
1529 }
1530
1531 /* Hashes the structure of datasheet DS and returns the hash.
1532 We use MD4 because it is much faster than MD5 or SHA-1 but its
1533 collision resistance is just as good. */
1534 unsigned int
hash_datasheet(const struct datasheet * ds)1535 hash_datasheet (const struct datasheet *ds)
1536 {
1537 unsigned int hash[DIV_RND_UP (20, sizeof (unsigned int))];
1538 struct md4_ctx ctx;
1539 size_t i;
1540
1541 md4_init_ctx (&ctx);
1542 for (i = 0; i < ds->n_columns; i++)
1543 {
1544 const struct column *column = &ds->columns[i];
1545 int source_n_bytes = sparse_xarray_get_n_columns (column->source->data);
1546 md4_process_bytes (&source_n_bytes, sizeof source_n_bytes, &ctx);
1547 /*md4_process_bytes (&column->byte_ofs, sizeof column->byte_ofs, &ctx);*/
1548 md4_process_bytes (&column->value_ofs, sizeof column->value_ofs, &ctx);
1549 md4_process_bytes (&column->width, sizeof column->width, &ctx);
1550 }
1551 axis_hash (ds->rows, &ctx);
1552 md4_process_bytes (&ds->column_min_alloc, sizeof ds->column_min_alloc, &ctx);
1553 md4_finish_ctx (&ctx, hash);
1554 return hash[0];
1555 }
1556