1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/dataset.h"
20
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/misc.h"
40 #include "libpspp/str.h"
41 #include "libpspp/taint.h"
42 #include "libpspp/i18n.h"
43
44 #include "gl/minmax.h"
45 #include "gl/xalloc.h"
46
47 struct dataset {
48 /* A dataset is usually part of a session. Within a session its name must
49 unique. The name must either be a valid PSPP identifier or the empty
50 string. (It must be unique within the session even if it is the empty
51 string; that is, there may only be a single dataset within a session with
52 the empty string as its name.) */
53 struct session *session;
54 char *name;
55 enum dataset_display display;
56
57 /* Cases are read from source,
58 their transformation variables are initialized,
59 pass through permanent_trns_chain (which transforms them into
60 the format described by permanent_dict),
61 are written to sink,
62 pass through temporary_trns_chain (which transforms them into
63 the format described by dict),
64 and are finally passed to the procedure. */
65 struct casereader *source;
66 struct caseinit *caseinit;
67 struct trns_chain *permanent_trns_chain;
68 struct dictionary *permanent_dict;
69 struct casewriter *sink;
70 struct trns_chain *temporary_trns_chain;
71 struct dictionary *dict;
72
73 /* If true, cases are discarded instead of being written to
74 sink. */
75 bool discard_output;
76
77 /* The transformation chain that the next transformation will be
78 added to. */
79 struct trns_chain *cur_trns_chain;
80
81 /* The case map used to compact a case, if necessary;
82 otherwise a null pointer. */
83 struct case_map *compactor;
84
85 /* Time at which proc was last invoked. */
86 time_t last_proc_invocation;
87
88 /* Cases just before ("lagging") the current one. */
89 int n_lag; /* Number of cases to lag. */
90 struct deque lag; /* Deque of lagged cases. */
91 struct ccase **lag_cases; /* Lagged cases managed by deque. */
92
93 /* Procedure data. */
94 enum
95 {
96 PROC_COMMITTED, /* No procedure in progress. */
97 PROC_OPEN, /* proc_open called, casereader still open. */
98 PROC_CLOSED /* casereader from proc_open destroyed,
99 but proc_commit not yet called. */
100 }
101 proc_state;
102 casenumber cases_written; /* Cases output so far. */
103 bool ok; /* Error status. */
104 struct casereader_shim *shim; /* Shim on proc_open() casereader. */
105
106 const struct dataset_callbacks *callbacks;
107 void *cb_data;
108
109 /* Uniquely distinguishes datasets. */
110 unsigned int seqno;
111 };
112
113 static void dataset_changed__ (struct dataset *);
114 static void dataset_transformations_changed__ (struct dataset *,
115 bool non_empty);
116
117 static void add_case_limit_trns (struct dataset *ds);
118 static void add_filter_trns (struct dataset *ds);
119
120 static void update_last_proc_invocation (struct dataset *ds);
121
122 static void
dict_callback(struct dictionary * d UNUSED,void * ds_)123 dict_callback (struct dictionary *d UNUSED, void *ds_)
124 {
125 struct dataset *ds = ds_;
126 dataset_changed__ (ds);
127 }
128
129 static void
dataset_create_finish__(struct dataset * ds,struct session * session)130 dataset_create_finish__ (struct dataset *ds, struct session *session)
131 {
132 static unsigned int seqno;
133
134 dict_set_change_callback (ds->dict, dict_callback, ds);
135 proc_cancel_all_transformations (ds);
136 dataset_set_session (ds, session);
137 ds->seqno = ++seqno;
138 }
139
140 /* Creates a new dataset named NAME, adds it to SESSION, and returns it. If
141 SESSION already contains a dataset named NAME, it is deleted and replaced.
142 The dataset initially has an empty dictionary and no data source. */
143 struct dataset *
dataset_create(struct session * session,const char * name)144 dataset_create (struct session *session, const char *name)
145 {
146 struct dataset *ds;
147
148 ds = xzalloc (sizeof *ds);
149 ds->name = xstrdup (name);
150 ds->display = DATASET_FRONT;
151 ds->dict = dict_create (get_default_encoding ());
152
153 ds->caseinit = caseinit_create ();
154
155 dataset_create_finish__ (ds, session);
156
157 return ds;
158 }
159
160 /* Creates and returns a new dataset that has the same data and dictionary as
161 OLD named NAME, adds it to the same session as OLD, and returns the new
162 dataset. If SESSION already contains a dataset named NAME, it is deleted
163 and replaced.
164
165 OLD must not have any active transformations or temporary state and must
166 not be in the middle of a procedure.
167
168 Callbacks are not cloned. */
169 struct dataset *
dataset_clone(struct dataset * old,const char * name)170 dataset_clone (struct dataset *old, const char *name)
171 {
172 struct dataset *new;
173
174 assert (old->proc_state == PROC_COMMITTED);
175 assert (trns_chain_is_empty (old->permanent_trns_chain));
176 assert (old->permanent_dict == NULL);
177 assert (old->sink == NULL);
178 assert (old->temporary_trns_chain == NULL);
179
180 new = xzalloc (sizeof *new);
181 new->name = xstrdup (name);
182 new->display = DATASET_FRONT;
183 new->source = casereader_clone (old->source);
184 new->dict = dict_clone (old->dict);
185 new->caseinit = caseinit_clone (old->caseinit);
186 new->last_proc_invocation = old->last_proc_invocation;
187 new->ok = old->ok;
188
189 dataset_create_finish__ (new, old->session);
190
191 return new;
192 }
193
194 /* Destroys DS. */
195 void
dataset_destroy(struct dataset * ds)196 dataset_destroy (struct dataset *ds)
197 {
198 if (ds != NULL)
199 {
200 dataset_set_session (ds, NULL);
201 dataset_clear (ds);
202 dict_unref (ds->dict);
203 caseinit_destroy (ds->caseinit);
204 trns_chain_destroy (ds->permanent_trns_chain);
205 dataset_transformations_changed__ (ds, false);
206 free (ds->name);
207 free (ds);
208 }
209 }
210
211 /* Discards the active dataset's dictionary, data, and transformations. */
212 void
dataset_clear(struct dataset * ds)213 dataset_clear (struct dataset *ds)
214 {
215 assert (ds->proc_state == PROC_COMMITTED);
216
217 dict_clear (ds->dict);
218 fh_set_default_handle (NULL);
219
220 ds->n_lag = 0;
221
222 casereader_destroy (ds->source);
223 ds->source = NULL;
224
225 proc_cancel_all_transformations (ds);
226 }
227
228 const char *
dataset_name(const struct dataset * ds)229 dataset_name (const struct dataset *ds)
230 {
231 return ds->name;
232 }
233
234 void
dataset_set_name(struct dataset * ds,const char * name)235 dataset_set_name (struct dataset *ds, const char *name)
236 {
237 struct session *session = ds->session;
238 bool active = false;
239
240 if (session != NULL)
241 {
242 active = session_active_dataset (session) == ds;
243 if (active)
244 session_set_active_dataset (session, NULL);
245 dataset_set_session (ds, NULL);
246 }
247
248 free (ds->name);
249 ds->name = xstrdup (name);
250
251 if (session != NULL)
252 {
253 dataset_set_session (ds, session);
254 if (active)
255 session_set_active_dataset (session, ds);
256 }
257 }
258
259 struct session *
dataset_session(const struct dataset * ds)260 dataset_session (const struct dataset *ds)
261 {
262 return ds->session;
263 }
264
265 void
dataset_set_session(struct dataset * ds,struct session * session)266 dataset_set_session (struct dataset *ds, struct session *session)
267 {
268 if (session != ds->session)
269 {
270 if (ds->session != NULL)
271 session_remove_dataset (ds->session, ds);
272 if (session != NULL)
273 session_add_dataset (session, ds);
274 }
275 }
276
277 /* Returns the dictionary within DS. This is always nonnull, although it
278 might not contain any variables. */
279 struct dictionary *
dataset_dict(const struct dataset * ds)280 dataset_dict (const struct dataset *ds)
281 {
282 return ds->dict;
283 }
284
285 /* Replaces DS's dictionary by DICT, discarding any source and
286 transformations. */
287 void
dataset_set_dict(struct dataset * ds,struct dictionary * dict)288 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
289 {
290 assert (ds->proc_state == PROC_COMMITTED);
291 assert (ds->dict != dict);
292
293 dataset_clear (ds);
294
295 dict_unref (ds->dict);
296 ds->dict = dict;
297 dict_set_change_callback (ds->dict, dict_callback, ds);
298 }
299
300 /* Returns the casereader that will be read when a procedure is executed on
301 DS. This can be NULL if none has been set up yet. */
302 const struct casereader *
dataset_source(const struct dataset * ds)303 dataset_source (const struct dataset *ds)
304 {
305 return ds->source;
306 }
307
308 /* Returns true if DS has a data source, false otherwise. */
309 bool
dataset_has_source(const struct dataset * ds)310 dataset_has_source (const struct dataset *ds)
311 {
312 return dataset_source (ds) != NULL;
313 }
314
315 /* Replaces the active dataset's data by READER. READER's cases must have an
316 appropriate format for DS's dictionary. */
317 bool
dataset_set_source(struct dataset * ds,struct casereader * reader)318 dataset_set_source (struct dataset *ds, struct casereader *reader)
319 {
320 casereader_destroy (ds->source);
321 ds->source = reader;
322
323 caseinit_clear (ds->caseinit);
324 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
325
326 return reader == NULL || !casereader_error (reader);
327 }
328
329 /* Returns the data source from DS and removes it from DS. Returns a null
330 pointer if DS has no data source. */
331 struct casereader *
dataset_steal_source(struct dataset * ds)332 dataset_steal_source (struct dataset *ds)
333 {
334 struct casereader *reader = ds->source;
335 ds->source = NULL;
336
337 return reader;
338 }
339
340 /* Returns a number unique to DS. It can be used to distinguish one dataset
341 from any other within a given program run, even datasets that do not exist
342 at the same time. */
343 unsigned int
dataset_seqno(const struct dataset * ds)344 dataset_seqno (const struct dataset *ds)
345 {
346 return ds->seqno;
347 }
348
349 void
dataset_set_callbacks(struct dataset * ds,const struct dataset_callbacks * callbacks,void * cb_data)350 dataset_set_callbacks (struct dataset *ds,
351 const struct dataset_callbacks *callbacks,
352 void *cb_data)
353 {
354 ds->callbacks = callbacks;
355 ds->cb_data = cb_data;
356 }
357
358 enum dataset_display
dataset_get_display(const struct dataset * ds)359 dataset_get_display (const struct dataset *ds)
360 {
361 return ds->display;
362 }
363
364 void
dataset_set_display(struct dataset * ds,enum dataset_display display)365 dataset_set_display (struct dataset *ds, enum dataset_display display)
366 {
367 ds->display = display;
368 }
369
370 /* Returns the last time the data was read. */
371 time_t
time_of_last_procedure(struct dataset * ds)372 time_of_last_procedure (struct dataset *ds)
373 {
374 if (ds->last_proc_invocation == 0)
375 update_last_proc_invocation (ds);
376 return ds->last_proc_invocation;
377 }
378
379 /* Regular procedure. */
380
381 /* Executes any pending transformations, if necessary.
382 This is not identical to the EXECUTE command in that it won't
383 always read the source data. This can be important when the
384 source data is given inline within BEGIN DATA...END FILE. */
385 bool
proc_execute(struct dataset * ds)386 proc_execute (struct dataset *ds)
387 {
388 bool ok;
389
390 if ((ds->temporary_trns_chain == NULL
391 || trns_chain_is_empty (ds->temporary_trns_chain))
392 && trns_chain_is_empty (ds->permanent_trns_chain))
393 {
394 ds->n_lag = 0;
395 ds->discard_output = false;
396 dict_set_case_limit (ds->dict, 0);
397 dict_clear_vectors (ds->dict);
398 return true;
399 }
400
401 ok = casereader_destroy (proc_open (ds));
402 return proc_commit (ds) && ok;
403 }
404
405 static const struct casereader_class proc_casereader_class;
406
407 /* Opens dataset DS for reading cases with proc_read. If FILTER is true, then
408 cases filtered out with FILTER BY will not be included in the casereader
409 (which is usually desirable). If FILTER is false, all cases will be
410 included regardless of FILTER BY settings.
411
412 proc_commit must be called when done. */
413 struct casereader *
proc_open_filtering(struct dataset * ds,bool filter)414 proc_open_filtering (struct dataset *ds, bool filter)
415 {
416 struct casereader *reader;
417
418 assert (ds->source != NULL);
419 assert (ds->proc_state == PROC_COMMITTED);
420
421 update_last_proc_invocation (ds);
422
423 caseinit_mark_for_init (ds->caseinit, ds->dict);
424
425 /* Finish up the collection of transformations. */
426 add_case_limit_trns (ds);
427 if (filter)
428 add_filter_trns (ds);
429 trns_chain_finalize (ds->cur_trns_chain);
430
431 /* Make permanent_dict refer to the dictionary right before
432 data reaches the sink. */
433 if (ds->permanent_dict == NULL)
434 ds->permanent_dict = ds->dict;
435
436 /* Prepare sink. */
437 if (!ds->discard_output)
438 {
439 struct dictionary *pd = ds->permanent_dict;
440 size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
441 if (compacted_value_cnt < dict_get_next_value_idx (pd))
442 {
443 struct caseproto *compacted_proto;
444 compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
445 ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
446 ds->sink = autopaging_writer_create (compacted_proto);
447 caseproto_unref (compacted_proto);
448 }
449 else
450 {
451 ds->compactor = NULL;
452 ds->sink = autopaging_writer_create (dict_get_proto (pd));
453 }
454 }
455 else
456 {
457 ds->compactor = NULL;
458 ds->sink = NULL;
459 }
460
461 /* Allocate memory for lagged cases. */
462 ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
463
464 ds->proc_state = PROC_OPEN;
465 ds->cases_written = 0;
466 ds->ok = true;
467
468 /* FIXME: use taint in dataset in place of `ok'? */
469 /* FIXME: for trivial cases we can just return a clone of
470 ds->source? */
471
472 /* Create casereader and insert a shim on top. The shim allows us to
473 arbitrarily extend the casereader's lifetime, by slurping the cases into
474 the shim's buffer in proc_commit(). That is especially useful when output
475 table_items are generated directly from the procedure casereader (e.g. by
476 the LIST procedure) when we are using an output driver that keeps a
477 reference to the output items passed to it (e.g. the GUI output driver in
478 PSPPIRE). */
479 reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
480 CASENUMBER_MAX,
481 &proc_casereader_class, ds);
482 ds->shim = casereader_shim_insert (reader);
483 return reader;
484 }
485
486 /* Opens dataset DS for reading cases with proc_read.
487 proc_commit must be called when done. */
488 struct casereader *
proc_open(struct dataset * ds)489 proc_open (struct dataset *ds)
490 {
491 return proc_open_filtering (ds, true);
492 }
493
494 /* Returns true if a procedure is in progress, that is, if
495 proc_open has been called but proc_commit has not. */
496 bool
proc_is_open(const struct dataset * ds)497 proc_is_open (const struct dataset *ds)
498 {
499 return ds->proc_state != PROC_COMMITTED;
500 }
501
502 /* "read" function for procedure casereader. */
503 static struct ccase *
proc_casereader_read(struct casereader * reader UNUSED,void * ds_)504 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
505 {
506 struct dataset *ds = ds_;
507 enum trns_result retval = TRNS_DROP_CASE;
508 struct ccase *c;
509
510 assert (ds->proc_state == PROC_OPEN);
511 for (; ; case_unref (c))
512 {
513 casenumber case_nr;
514
515 assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
516 if (retval == TRNS_ERROR)
517 ds->ok = false;
518 if (!ds->ok)
519 return NULL;
520
521 /* Read a case from source. */
522 c = casereader_read (ds->source);
523 if (c == NULL)
524 return NULL;
525 c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
526 caseinit_init_vars (ds->caseinit, c);
527
528 /* Execute permanent transformations. */
529 case_nr = ds->cases_written + 1;
530 retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
531 &c, case_nr);
532 caseinit_update_left_vars (ds->caseinit, c);
533 if (retval != TRNS_CONTINUE)
534 continue;
535
536 /* Write case to collection of lagged cases. */
537 if (ds->n_lag > 0)
538 {
539 while (deque_count (&ds->lag) >= ds->n_lag)
540 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
541 ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
542 }
543
544 /* Write case to replacement dataset. */
545 ds->cases_written++;
546 if (ds->sink != NULL)
547 casewriter_write (ds->sink,
548 case_map_execute (ds->compactor, case_ref (c)));
549
550 /* Execute temporary transformations. */
551 if (ds->temporary_trns_chain != NULL)
552 {
553 retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
554 &c, ds->cases_written);
555 if (retval != TRNS_CONTINUE)
556 continue;
557 }
558
559 return c;
560 }
561 }
562
563 /* "destroy" function for procedure casereader. */
564 static void
proc_casereader_destroy(struct casereader * reader,void * ds_)565 proc_casereader_destroy (struct casereader *reader, void *ds_)
566 {
567 struct dataset *ds = ds_;
568 struct ccase *c;
569
570 /* We are always the subreader for a casereader_buffer, so if we're being
571 destroyed then it's because the casereader_buffer has read all the cases
572 that it ever will. */
573 ds->shim = NULL;
574
575 /* Make sure transformations happen for every input case, in
576 case they have side effects, and ensure that the replacement
577 active dataset gets all the cases it should. */
578 while ((c = casereader_read (reader)) != NULL)
579 case_unref (c);
580
581 ds->proc_state = PROC_CLOSED;
582 ds->ok = casereader_destroy (ds->source) && ds->ok;
583 ds->source = NULL;
584 dataset_set_source (ds, NULL);
585 }
586
587 /* Must return false if the source casereader, a transformation,
588 or the sink casewriter signaled an error. (If a temporary
589 transformation signals an error, then the return value is
590 false, but the replacement active dataset may still be
591 untainted.) */
592 bool
proc_commit(struct dataset * ds)593 proc_commit (struct dataset *ds)
594 {
595 if (ds->shim != NULL)
596 casereader_shim_slurp (ds->shim);
597
598 assert (ds->proc_state == PROC_CLOSED);
599 ds->proc_state = PROC_COMMITTED;
600
601 dataset_changed__ (ds);
602
603 /* Free memory for lagged cases. */
604 while (!deque_is_empty (&ds->lag))
605 case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
606 free (ds->lag_cases);
607
608 /* Dictionary from before TEMPORARY becomes permanent. */
609 proc_cancel_temporary_transformations (ds);
610
611 if (!ds->discard_output)
612 {
613 /* Finish compacting. */
614 if (ds->compactor != NULL)
615 {
616 case_map_destroy (ds->compactor);
617 ds->compactor = NULL;
618
619 dict_delete_scratch_vars (ds->dict);
620 dict_compact_values (ds->dict);
621 }
622
623 /* Old data sink becomes new data source. */
624 if (ds->sink != NULL)
625 ds->source = casewriter_make_reader (ds->sink);
626 }
627 else
628 {
629 ds->source = NULL;
630 ds->discard_output = false;
631 }
632 ds->sink = NULL;
633
634 caseinit_clear (ds->caseinit);
635 caseinit_mark_as_preinited (ds->caseinit, ds->dict);
636
637 dict_clear_vectors (ds->dict);
638 ds->permanent_dict = NULL;
639 return proc_cancel_all_transformations (ds) && ds->ok;
640 }
641
642 /* Casereader class for procedure execution. */
643 static const struct casereader_class proc_casereader_class =
644 {
645 proc_casereader_read,
646 proc_casereader_destroy,
647 NULL,
648 NULL,
649 };
650
651 /* Updates last_proc_invocation. */
652 static void
update_last_proc_invocation(struct dataset * ds)653 update_last_proc_invocation (struct dataset *ds)
654 {
655 ds->last_proc_invocation = time (NULL);
656 }
657
658 /* Returns a pointer to the lagged case from N_BEFORE cases before the
659 current one, or NULL if there haven't been that many cases yet. */
660 const struct ccase *
lagged_case(const struct dataset * ds,int n_before)661 lagged_case (const struct dataset *ds, int n_before)
662 {
663 assert (n_before >= 1);
664 assert (n_before <= ds->n_lag);
665
666 if (n_before <= deque_count (&ds->lag))
667 return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
668 else
669 return NULL;
670 }
671
672 /* Returns the current set of permanent transformations,
673 and clears the permanent transformations.
674 For use by INPUT PROGRAM. */
675 struct trns_chain *
proc_capture_transformations(struct dataset * ds)676 proc_capture_transformations (struct dataset *ds)
677 {
678 struct trns_chain *chain;
679
680 assert (ds->temporary_trns_chain == NULL);
681 chain = ds->permanent_trns_chain;
682 ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
683 dataset_transformations_changed__ (ds, false);
684
685 return chain;
686 }
687
688 /* Adds a transformation that processes a case with PROC and
689 frees itself with FREE to the current set of transformations.
690 The functions are passed AUX as auxiliary data. */
691 void
add_transformation(struct dataset * ds,trns_proc_func * proc,trns_free_func * free,void * aux)692 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
693 {
694 trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
695 dataset_transformations_changed__ (ds, true);
696 }
697
698 /* Adds a transformation that processes a case with PROC and
699 frees itself with FREE to the current set of transformations.
700 When parsing of the block of transformations is complete,
701 FINALIZE will be called.
702 The functions are passed AUX as auxiliary data. */
703 void
add_transformation_with_finalizer(struct dataset * ds,trns_finalize_func * finalize,trns_proc_func * proc,trns_free_func * free,void * aux)704 add_transformation_with_finalizer (struct dataset *ds,
705 trns_finalize_func *finalize,
706 trns_proc_func *proc,
707 trns_free_func *free, void *aux)
708 {
709 trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
710 dataset_transformations_changed__ (ds, true);
711 }
712
713 /* Returns the index of the next transformation.
714 This value can be returned by a transformation procedure
715 function to indicate a "jump" to that transformation. */
716 size_t
next_transformation(const struct dataset * ds)717 next_transformation (const struct dataset *ds)
718 {
719 return trns_chain_next (ds->cur_trns_chain);
720 }
721
722 /* Returns true if the next call to add_transformation() will add
723 a temporary transformation, false if it will add a permanent
724 transformation. */
725 bool
proc_in_temporary_transformations(const struct dataset * ds)726 proc_in_temporary_transformations (const struct dataset *ds)
727 {
728 return ds->temporary_trns_chain != NULL;
729 }
730
731 /* Marks the start of temporary transformations.
732 Further calls to add_transformation() will add temporary
733 transformations. */
734 void
proc_start_temporary_transformations(struct dataset * ds)735 proc_start_temporary_transformations (struct dataset *ds)
736 {
737 if (!proc_in_temporary_transformations (ds))
738 {
739 add_case_limit_trns (ds);
740
741 ds->permanent_dict = dict_clone (ds->dict);
742
743 trns_chain_finalize (ds->permanent_trns_chain);
744 ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
745 dataset_transformations_changed__ (ds, true);
746 }
747 }
748
749 /* Converts all the temporary transformations, if any, to permanent
750 transformations. Further transformations will be permanent.
751
752 The FILTER command is implemented as a temporary transformation, so a
753 procedure that uses this function should usually use proc_open_filtering()
754 with FILTER false, instead of plain proc_open().
755
756 Returns true if anything changed, false otherwise. */
757 bool
proc_make_temporary_transformations_permanent(struct dataset * ds)758 proc_make_temporary_transformations_permanent (struct dataset *ds)
759 {
760 if (proc_in_temporary_transformations (ds))
761 {
762 trns_chain_finalize (ds->temporary_trns_chain);
763 trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
764 ds->temporary_trns_chain = NULL;
765
766 ds->cur_trns_chain = ds->permanent_trns_chain;
767
768 dict_unref (ds->permanent_dict);
769 ds->permanent_dict = NULL;
770
771 return true;
772 }
773 else
774 return false;
775 }
776
777 /* Cancels all temporary transformations, if any. Further
778 transformations will be permanent.
779 Returns true if anything changed, false otherwise. */
780 bool
proc_cancel_temporary_transformations(struct dataset * ds)781 proc_cancel_temporary_transformations (struct dataset *ds)
782 {
783 if (proc_in_temporary_transformations (ds))
784 {
785 dict_unref (ds->dict);
786 ds->dict = ds->permanent_dict;
787 ds->permanent_dict = NULL;
788
789 trns_chain_destroy (ds->temporary_trns_chain);
790 ds->temporary_trns_chain = NULL;
791 dataset_transformations_changed__ (
792 ds, !trns_chain_is_empty (ds->permanent_trns_chain));
793 return true;
794 }
795 else
796 return false;
797 }
798
799 /* Cancels all transformations, if any.
800 Returns true if successful, false on I/O error. */
801 bool
proc_cancel_all_transformations(struct dataset * ds)802 proc_cancel_all_transformations (struct dataset *ds)
803 {
804 bool ok;
805 assert (ds->proc_state == PROC_COMMITTED);
806 ok = trns_chain_destroy (ds->permanent_trns_chain);
807 ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
808 ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
809 ds->temporary_trns_chain = NULL;
810 dataset_transformations_changed__ (ds, false);
811
812 return ok;
813 }
814
815 static int
store_case_num(void * var_,struct ccase ** cc,casenumber case_num)816 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
817 {
818 struct variable *var = var_;
819
820 *cc = case_unshare (*cc);
821 case_data_rw (*cc, var)->f = case_num;
822
823 return TRNS_CONTINUE;
824 }
825
826 /* Add a variable which we can sort by to get back the original order. */
827 struct variable *
add_permanent_ordering_transformation(struct dataset * ds)828 add_permanent_ordering_transformation (struct dataset *ds)
829 {
830 struct variable *temp_var;
831
832 temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
833 if (proc_in_temporary_transformations (ds))
834 {
835 struct variable *perm_var;
836
837 perm_var = dict_clone_var_in_place_assert (ds->permanent_dict, temp_var);
838 trns_chain_append (ds->permanent_trns_chain, NULL, store_case_num,
839 NULL, perm_var);
840 trns_chain_finalize (ds->permanent_trns_chain);
841 }
842 else
843 add_transformation (ds, store_case_num, NULL, temp_var);
844
845 return temp_var;
846 }
847
848 /* Causes output from the next procedure to be discarded, instead
849 of being preserved for use as input for the next procedure. */
850 void
proc_discard_output(struct dataset * ds)851 proc_discard_output (struct dataset *ds)
852 {
853 ds->discard_output = true;
854 }
855
856
857 /* Checks whether DS has a corrupted active dataset. If so,
858 discards it and returns false. If not, returns true without
859 doing anything. */
860 bool
dataset_end_of_command(struct dataset * ds)861 dataset_end_of_command (struct dataset *ds)
862 {
863 if (ds->source != NULL)
864 {
865 if (casereader_error (ds->source))
866 {
867 dataset_clear (ds);
868 return false;
869 }
870 else
871 {
872 const struct taint *taint = casereader_get_taint (ds->source);
873 taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
874 assert (!taint_has_tainted_successor (taint));
875 }
876 }
877 return true;
878 }
879
880 static trns_proc_func case_limit_trns_proc;
881 static trns_free_func case_limit_trns_free;
882
883 /* Adds a transformation that limits the number of cases that may
884 pass through, if DS->DICT has a case limit. */
885 static void
add_case_limit_trns(struct dataset * ds)886 add_case_limit_trns (struct dataset *ds)
887 {
888 casenumber case_limit = dict_get_case_limit (ds->dict);
889 if (case_limit != 0)
890 {
891 casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
892 *cases_remaining = case_limit;
893 add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
894 cases_remaining);
895 dict_set_case_limit (ds->dict, 0);
896 }
897 }
898
899 /* Limits the maximum number of cases processed to
900 *CASES_REMAINING. */
901 static int
case_limit_trns_proc(void * cases_remaining_,struct ccase ** c UNUSED,casenumber case_nr UNUSED)902 case_limit_trns_proc (void *cases_remaining_,
903 struct ccase **c UNUSED, casenumber case_nr UNUSED)
904 {
905 size_t *cases_remaining = cases_remaining_;
906 if (*cases_remaining > 0)
907 {
908 (*cases_remaining)--;
909 return TRNS_CONTINUE;
910 }
911 else
912 return TRNS_DROP_CASE;
913 }
914
915 /* Frees the data associated with a case limit transformation. */
916 static bool
case_limit_trns_free(void * cases_remaining_)917 case_limit_trns_free (void *cases_remaining_)
918 {
919 size_t *cases_remaining = cases_remaining_;
920 free (cases_remaining);
921 return true;
922 }
923
924 static trns_proc_func filter_trns_proc;
925
926 /* Adds a temporary transformation to filter data according to
927 the variable specified on FILTER, if any. */
928 static void
add_filter_trns(struct dataset * ds)929 add_filter_trns (struct dataset *ds)
930 {
931 struct variable *filter_var = dict_get_filter (ds->dict);
932 if (filter_var != NULL)
933 {
934 proc_start_temporary_transformations (ds);
935 add_transformation (ds, filter_trns_proc, NULL, filter_var);
936 }
937 }
938
939 /* FILTER transformation. */
940 static int
filter_trns_proc(void * filter_var_,struct ccase ** c,casenumber case_nr UNUSED)941 filter_trns_proc (void *filter_var_,
942 struct ccase **c, casenumber case_nr UNUSED)
943
944 {
945 struct variable *filter_var = filter_var_;
946 double f = case_num (*c, filter_var);
947 return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
948 ? TRNS_CONTINUE : TRNS_DROP_CASE);
949 }
950
951
952 void
dataset_need_lag(struct dataset * ds,int n_before)953 dataset_need_lag (struct dataset *ds, int n_before)
954 {
955 ds->n_lag = MAX (ds->n_lag, n_before);
956 }
957
958 static void
dataset_changed__(struct dataset * ds)959 dataset_changed__ (struct dataset *ds)
960 {
961 if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
962 ds->callbacks->changed (ds->cb_data);
963 }
964
965 static void
dataset_transformations_changed__(struct dataset * ds,bool non_empty)966 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
967 {
968 if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
969 ds->callbacks->transformations_changed (non_empty, ds->cb_data);
970 }
971
972 /* Private interface for use by session code. */
973
974 void
dataset_set_session__(struct dataset * ds,struct session * session)975 dataset_set_session__ (struct dataset *ds, struct session *session)
976 {
977 ds->session = session;
978 }
979