1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010, 2011, 2013 Free Software Foundation, Inc.
3 
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16 
17 #include <config.h>
18 
19 #include "data/dataset.h"
20 
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25 
26 #include "data/case.h"
27 #include "data/case-map.h"
28 #include "data/caseinit.h"
29 #include "data/casereader.h"
30 #include "data/casereader-provider.h"
31 #include "data/casereader-shim.h"
32 #include "data/casewriter.h"
33 #include "data/dictionary.h"
34 #include "data/file-handle-def.h"
35 #include "data/session.h"
36 #include "data/transformations.h"
37 #include "data/variable.h"
38 #include "libpspp/deque.h"
39 #include "libpspp/misc.h"
40 #include "libpspp/str.h"
41 #include "libpspp/taint.h"
42 #include "libpspp/i18n.h"
43 
44 #include "gl/minmax.h"
45 #include "gl/xalloc.h"
46 
47 struct dataset {
48   /* A dataset is usually part of a session.  Within a session its name must
49      unique.  The name must either be a valid PSPP identifier or the empty
50      string.  (It must be unique within the session even if it is the empty
51      string; that is, there may only be a single dataset within a session with
52      the empty string as its name.) */
53   struct session *session;
54   char *name;
55   enum dataset_display display;
56 
57   /* Cases are read from source,
58      their transformation variables are initialized,
59      pass through permanent_trns_chain (which transforms them into
60      the format described by permanent_dict),
61      are written to sink,
62      pass through temporary_trns_chain (which transforms them into
63      the format described by dict),
64      and are finally passed to the procedure. */
65   struct casereader *source;
66   struct caseinit *caseinit;
67   struct trns_chain *permanent_trns_chain;
68   struct dictionary *permanent_dict;
69   struct casewriter *sink;
70   struct trns_chain *temporary_trns_chain;
71   struct dictionary *dict;
72 
73   /* If true, cases are discarded instead of being written to
74      sink. */
75   bool discard_output;
76 
77   /* The transformation chain that the next transformation will be
78      added to. */
79   struct trns_chain *cur_trns_chain;
80 
81   /* The case map used to compact a case, if necessary;
82      otherwise a null pointer. */
83   struct case_map *compactor;
84 
85   /* Time at which proc was last invoked. */
86   time_t last_proc_invocation;
87 
88   /* Cases just before ("lagging") the current one. */
89   int n_lag;			/* Number of cases to lag. */
90   struct deque lag;             /* Deque of lagged cases. */
91   struct ccase **lag_cases;     /* Lagged cases managed by deque. */
92 
93   /* Procedure data. */
94   enum
95     {
96       PROC_COMMITTED,           /* No procedure in progress. */
97       PROC_OPEN,                /* proc_open called, casereader still open. */
98       PROC_CLOSED               /* casereader from proc_open destroyed,
99                                    but proc_commit not yet called. */
100     }
101   proc_state;
102   casenumber cases_written;     /* Cases output so far. */
103   bool ok;                      /* Error status. */
104   struct casereader_shim *shim; /* Shim on proc_open() casereader. */
105 
106   const struct dataset_callbacks *callbacks;
107   void *cb_data;
108 
109   /* Uniquely distinguishes datasets. */
110   unsigned int seqno;
111 };
112 
113 static void dataset_changed__ (struct dataset *);
114 static void dataset_transformations_changed__ (struct dataset *,
115                                                bool non_empty);
116 
117 static void add_case_limit_trns (struct dataset *ds);
118 static void add_filter_trns (struct dataset *ds);
119 
120 static void update_last_proc_invocation (struct dataset *ds);
121 
122 static void
dict_callback(struct dictionary * d UNUSED,void * ds_)123 dict_callback (struct dictionary *d UNUSED, void *ds_)
124 {
125   struct dataset *ds = ds_;
126   dataset_changed__ (ds);
127 }
128 
129 static void
dataset_create_finish__(struct dataset * ds,struct session * session)130 dataset_create_finish__ (struct dataset *ds, struct session *session)
131 {
132   static unsigned int seqno;
133 
134   dict_set_change_callback (ds->dict, dict_callback, ds);
135   proc_cancel_all_transformations (ds);
136   dataset_set_session (ds, session);
137   ds->seqno = ++seqno;
138 }
139 
140 /* Creates a new dataset named NAME, adds it to SESSION, and returns it.  If
141    SESSION already contains a dataset named NAME, it is deleted and replaced.
142    The dataset initially has an empty dictionary and no data source. */
143 struct dataset *
dataset_create(struct session * session,const char * name)144 dataset_create (struct session *session, const char *name)
145 {
146   struct dataset *ds;
147 
148   ds = xzalloc (sizeof *ds);
149   ds->name = xstrdup (name);
150   ds->display = DATASET_FRONT;
151   ds->dict = dict_create (get_default_encoding ());
152 
153   ds->caseinit = caseinit_create ();
154 
155   dataset_create_finish__ (ds, session);
156 
157   return ds;
158 }
159 
160 /* Creates and returns a new dataset that has the same data and dictionary as
161    OLD named NAME, adds it to the same session as OLD, and returns the new
162    dataset.  If SESSION already contains a dataset named NAME, it is deleted
163    and replaced.
164 
165    OLD must not have any active transformations or temporary state and must
166    not be in the middle of a procedure.
167 
168    Callbacks are not cloned. */
169 struct dataset *
dataset_clone(struct dataset * old,const char * name)170 dataset_clone (struct dataset *old, const char *name)
171 {
172   struct dataset *new;
173 
174   assert (old->proc_state == PROC_COMMITTED);
175   assert (trns_chain_is_empty (old->permanent_trns_chain));
176   assert (old->permanent_dict == NULL);
177   assert (old->sink == NULL);
178   assert (old->temporary_trns_chain == NULL);
179 
180   new = xzalloc (sizeof *new);
181   new->name = xstrdup (name);
182   new->display = DATASET_FRONT;
183   new->source = casereader_clone (old->source);
184   new->dict = dict_clone (old->dict);
185   new->caseinit = caseinit_clone (old->caseinit);
186   new->last_proc_invocation = old->last_proc_invocation;
187   new->ok = old->ok;
188 
189   dataset_create_finish__ (new, old->session);
190 
191   return new;
192 }
193 
194 /* Destroys DS. */
195 void
dataset_destroy(struct dataset * ds)196 dataset_destroy (struct dataset *ds)
197 {
198   if (ds != NULL)
199     {
200       dataset_set_session (ds, NULL);
201       dataset_clear (ds);
202       dict_unref (ds->dict);
203       caseinit_destroy (ds->caseinit);
204       trns_chain_destroy (ds->permanent_trns_chain);
205       dataset_transformations_changed__ (ds, false);
206       free (ds->name);
207       free (ds);
208     }
209 }
210 
211 /* Discards the active dataset's dictionary, data, and transformations. */
212 void
dataset_clear(struct dataset * ds)213 dataset_clear (struct dataset *ds)
214 {
215   assert (ds->proc_state == PROC_COMMITTED);
216 
217   dict_clear (ds->dict);
218   fh_set_default_handle (NULL);
219 
220   ds->n_lag = 0;
221 
222   casereader_destroy (ds->source);
223   ds->source = NULL;
224 
225   proc_cancel_all_transformations (ds);
226 }
227 
228 const char *
dataset_name(const struct dataset * ds)229 dataset_name (const struct dataset *ds)
230 {
231   return ds->name;
232 }
233 
234 void
dataset_set_name(struct dataset * ds,const char * name)235 dataset_set_name (struct dataset *ds, const char *name)
236 {
237   struct session *session = ds->session;
238   bool active = false;
239 
240   if (session != NULL)
241     {
242       active = session_active_dataset (session) == ds;
243       if (active)
244         session_set_active_dataset (session, NULL);
245       dataset_set_session (ds, NULL);
246     }
247 
248   free (ds->name);
249   ds->name = xstrdup (name);
250 
251   if (session != NULL)
252     {
253       dataset_set_session (ds, session);
254       if (active)
255         session_set_active_dataset (session, ds);
256     }
257 }
258 
259 struct session *
dataset_session(const struct dataset * ds)260 dataset_session (const struct dataset *ds)
261 {
262   return ds->session;
263 }
264 
265 void
dataset_set_session(struct dataset * ds,struct session * session)266 dataset_set_session (struct dataset *ds, struct session *session)
267 {
268   if (session != ds->session)
269     {
270       if (ds->session != NULL)
271         session_remove_dataset (ds->session, ds);
272       if (session != NULL)
273         session_add_dataset (session, ds);
274     }
275 }
276 
277 /* Returns the dictionary within DS.  This is always nonnull, although it
278    might not contain any variables. */
279 struct dictionary *
dataset_dict(const struct dataset * ds)280 dataset_dict (const struct dataset *ds)
281 {
282   return ds->dict;
283 }
284 
285 /* Replaces DS's dictionary by DICT, discarding any source and
286    transformations. */
287 void
dataset_set_dict(struct dataset * ds,struct dictionary * dict)288 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
289 {
290   assert (ds->proc_state == PROC_COMMITTED);
291   assert (ds->dict != dict);
292 
293   dataset_clear (ds);
294 
295   dict_unref (ds->dict);
296   ds->dict = dict;
297   dict_set_change_callback (ds->dict, dict_callback, ds);
298 }
299 
300 /* Returns the casereader that will be read when a procedure is executed on
301    DS.  This can be NULL if none has been set up yet. */
302 const struct casereader *
dataset_source(const struct dataset * ds)303 dataset_source (const struct dataset *ds)
304 {
305   return ds->source;
306 }
307 
308 /* Returns true if DS has a data source, false otherwise. */
309 bool
dataset_has_source(const struct dataset * ds)310 dataset_has_source (const struct dataset *ds)
311 {
312   return dataset_source (ds) != NULL;
313 }
314 
315 /* Replaces the active dataset's data by READER.  READER's cases must have an
316    appropriate format for DS's dictionary. */
317 bool
dataset_set_source(struct dataset * ds,struct casereader * reader)318 dataset_set_source (struct dataset *ds, struct casereader *reader)
319 {
320   casereader_destroy (ds->source);
321   ds->source = reader;
322 
323   caseinit_clear (ds->caseinit);
324   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
325 
326   return reader == NULL || !casereader_error (reader);
327 }
328 
329 /* Returns the data source from DS and removes it from DS.  Returns a null
330    pointer if DS has no data source. */
331 struct casereader *
dataset_steal_source(struct dataset * ds)332 dataset_steal_source (struct dataset *ds)
333 {
334   struct casereader *reader = ds->source;
335   ds->source = NULL;
336 
337   return reader;
338 }
339 
340 /* Returns a number unique to DS.  It can be used to distinguish one dataset
341    from any other within a given program run, even datasets that do not exist
342    at the same time. */
343 unsigned int
dataset_seqno(const struct dataset * ds)344 dataset_seqno (const struct dataset *ds)
345 {
346   return ds->seqno;
347 }
348 
349 void
dataset_set_callbacks(struct dataset * ds,const struct dataset_callbacks * callbacks,void * cb_data)350 dataset_set_callbacks (struct dataset *ds,
351                        const struct dataset_callbacks *callbacks,
352                        void *cb_data)
353 {
354   ds->callbacks = callbacks;
355   ds->cb_data = cb_data;
356 }
357 
358 enum dataset_display
dataset_get_display(const struct dataset * ds)359 dataset_get_display (const struct dataset *ds)
360 {
361   return ds->display;
362 }
363 
364 void
dataset_set_display(struct dataset * ds,enum dataset_display display)365 dataset_set_display (struct dataset *ds, enum dataset_display display)
366 {
367   ds->display = display;
368 }
369 
370 /* Returns the last time the data was read. */
371 time_t
time_of_last_procedure(struct dataset * ds)372 time_of_last_procedure (struct dataset *ds)
373 {
374   if (ds->last_proc_invocation == 0)
375     update_last_proc_invocation (ds);
376   return ds->last_proc_invocation;
377 }
378 
379 /* Regular procedure. */
380 
381 /* Executes any pending transformations, if necessary.
382    This is not identical to the EXECUTE command in that it won't
383    always read the source data.  This can be important when the
384    source data is given inline within BEGIN DATA...END FILE. */
385 bool
proc_execute(struct dataset * ds)386 proc_execute (struct dataset *ds)
387 {
388   bool ok;
389 
390   if ((ds->temporary_trns_chain == NULL
391        || trns_chain_is_empty (ds->temporary_trns_chain))
392       && trns_chain_is_empty (ds->permanent_trns_chain))
393     {
394       ds->n_lag = 0;
395       ds->discard_output = false;
396       dict_set_case_limit (ds->dict, 0);
397       dict_clear_vectors (ds->dict);
398       return true;
399     }
400 
401   ok = casereader_destroy (proc_open (ds));
402   return proc_commit (ds) && ok;
403 }
404 
405 static const struct casereader_class proc_casereader_class;
406 
407 /* Opens dataset DS for reading cases with proc_read.  If FILTER is true, then
408    cases filtered out with FILTER BY will not be included in the casereader
409    (which is usually desirable).  If FILTER is false, all cases will be
410    included regardless of FILTER BY settings.
411 
412    proc_commit must be called when done. */
413 struct casereader *
proc_open_filtering(struct dataset * ds,bool filter)414 proc_open_filtering (struct dataset *ds, bool filter)
415 {
416   struct casereader *reader;
417 
418   assert (ds->source != NULL);
419   assert (ds->proc_state == PROC_COMMITTED);
420 
421   update_last_proc_invocation (ds);
422 
423   caseinit_mark_for_init (ds->caseinit, ds->dict);
424 
425   /* Finish up the collection of transformations. */
426   add_case_limit_trns (ds);
427   if (filter)
428     add_filter_trns (ds);
429   trns_chain_finalize (ds->cur_trns_chain);
430 
431   /* Make permanent_dict refer to the dictionary right before
432      data reaches the sink. */
433   if (ds->permanent_dict == NULL)
434     ds->permanent_dict = ds->dict;
435 
436   /* Prepare sink. */
437   if (!ds->discard_output)
438     {
439       struct dictionary *pd = ds->permanent_dict;
440       size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
441       if (compacted_value_cnt < dict_get_next_value_idx (pd))
442         {
443           struct caseproto *compacted_proto;
444           compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
445           ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
446           ds->sink = autopaging_writer_create (compacted_proto);
447           caseproto_unref (compacted_proto);
448         }
449       else
450         {
451           ds->compactor = NULL;
452           ds->sink = autopaging_writer_create (dict_get_proto (pd));
453         }
454     }
455   else
456     {
457       ds->compactor = NULL;
458       ds->sink = NULL;
459     }
460 
461   /* Allocate memory for lagged cases. */
462   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
463 
464   ds->proc_state = PROC_OPEN;
465   ds->cases_written = 0;
466   ds->ok = true;
467 
468   /* FIXME: use taint in dataset in place of `ok'? */
469   /* FIXME: for trivial cases we can just return a clone of
470      ds->source? */
471 
472   /* Create casereader and insert a shim on top.  The shim allows us to
473      arbitrarily extend the casereader's lifetime, by slurping the cases into
474      the shim's buffer in proc_commit().  That is especially useful when output
475      table_items are generated directly from the procedure casereader (e.g. by
476      the LIST procedure) when we are using an output driver that keeps a
477      reference to the output items passed to it (e.g. the GUI output driver in
478      PSPPIRE). */
479   reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
480                                          CASENUMBER_MAX,
481                                          &proc_casereader_class, ds);
482   ds->shim = casereader_shim_insert (reader);
483   return reader;
484 }
485 
486 /* Opens dataset DS for reading cases with proc_read.
487    proc_commit must be called when done. */
488 struct casereader *
proc_open(struct dataset * ds)489 proc_open (struct dataset *ds)
490 {
491   return proc_open_filtering (ds, true);
492 }
493 
494 /* Returns true if a procedure is in progress, that is, if
495    proc_open has been called but proc_commit has not. */
496 bool
proc_is_open(const struct dataset * ds)497 proc_is_open (const struct dataset *ds)
498 {
499   return ds->proc_state != PROC_COMMITTED;
500 }
501 
502 /* "read" function for procedure casereader. */
503 static struct ccase *
proc_casereader_read(struct casereader * reader UNUSED,void * ds_)504 proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
505 {
506   struct dataset *ds = ds_;
507   enum trns_result retval = TRNS_DROP_CASE;
508   struct ccase *c;
509 
510   assert (ds->proc_state == PROC_OPEN);
511   for (; ; case_unref (c))
512     {
513       casenumber case_nr;
514 
515       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
516       if (retval == TRNS_ERROR)
517         ds->ok = false;
518       if (!ds->ok)
519         return NULL;
520 
521       /* Read a case from source. */
522       c = casereader_read (ds->source);
523       if (c == NULL)
524         return NULL;
525       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
526       caseinit_init_vars (ds->caseinit, c);
527 
528       /* Execute permanent transformations.  */
529       case_nr = ds->cases_written + 1;
530       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
531                                    &c, case_nr);
532       caseinit_update_left_vars (ds->caseinit, c);
533       if (retval != TRNS_CONTINUE)
534         continue;
535 
536       /* Write case to collection of lagged cases. */
537       if (ds->n_lag > 0)
538         {
539           while (deque_count (&ds->lag) >= ds->n_lag)
540             case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
541           ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
542         }
543 
544       /* Write case to replacement dataset. */
545       ds->cases_written++;
546       if (ds->sink != NULL)
547         casewriter_write (ds->sink,
548                           case_map_execute (ds->compactor, case_ref (c)));
549 
550       /* Execute temporary transformations. */
551       if (ds->temporary_trns_chain != NULL)
552         {
553           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
554                                        &c, ds->cases_written);
555           if (retval != TRNS_CONTINUE)
556             continue;
557         }
558 
559       return c;
560     }
561 }
562 
563 /* "destroy" function for procedure casereader. */
564 static void
proc_casereader_destroy(struct casereader * reader,void * ds_)565 proc_casereader_destroy (struct casereader *reader, void *ds_)
566 {
567   struct dataset *ds = ds_;
568   struct ccase *c;
569 
570   /* We are always the subreader for a casereader_buffer, so if we're being
571      destroyed then it's because the casereader_buffer has read all the cases
572      that it ever will. */
573   ds->shim = NULL;
574 
575   /* Make sure transformations happen for every input case, in
576      case they have side effects, and ensure that the replacement
577      active dataset gets all the cases it should. */
578   while ((c = casereader_read (reader)) != NULL)
579     case_unref (c);
580 
581   ds->proc_state = PROC_CLOSED;
582   ds->ok = casereader_destroy (ds->source) && ds->ok;
583   ds->source = NULL;
584   dataset_set_source (ds, NULL);
585 }
586 
587 /* Must return false if the source casereader, a transformation,
588    or the sink casewriter signaled an error.  (If a temporary
589    transformation signals an error, then the return value is
590    false, but the replacement active dataset may still be
591    untainted.) */
592 bool
proc_commit(struct dataset * ds)593 proc_commit (struct dataset *ds)
594 {
595   if (ds->shim != NULL)
596     casereader_shim_slurp (ds->shim);
597 
598   assert (ds->proc_state == PROC_CLOSED);
599   ds->proc_state = PROC_COMMITTED;
600 
601   dataset_changed__ (ds);
602 
603   /* Free memory for lagged cases. */
604   while (!deque_is_empty (&ds->lag))
605     case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
606   free (ds->lag_cases);
607 
608   /* Dictionary from before TEMPORARY becomes permanent. */
609   proc_cancel_temporary_transformations (ds);
610 
611   if (!ds->discard_output)
612     {
613       /* Finish compacting. */
614       if (ds->compactor != NULL)
615         {
616           case_map_destroy (ds->compactor);
617           ds->compactor = NULL;
618 
619           dict_delete_scratch_vars (ds->dict);
620           dict_compact_values (ds->dict);
621         }
622 
623       /* Old data sink becomes new data source. */
624       if (ds->sink != NULL)
625         ds->source = casewriter_make_reader (ds->sink);
626     }
627   else
628     {
629       ds->source = NULL;
630       ds->discard_output = false;
631     }
632   ds->sink = NULL;
633 
634   caseinit_clear (ds->caseinit);
635   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
636 
637   dict_clear_vectors (ds->dict);
638   ds->permanent_dict = NULL;
639   return proc_cancel_all_transformations (ds) && ds->ok;
640 }
641 
642 /* Casereader class for procedure execution. */
643 static const struct casereader_class proc_casereader_class =
644   {
645     proc_casereader_read,
646     proc_casereader_destroy,
647     NULL,
648     NULL,
649   };
650 
651 /* Updates last_proc_invocation. */
652 static void
update_last_proc_invocation(struct dataset * ds)653 update_last_proc_invocation (struct dataset *ds)
654 {
655   ds->last_proc_invocation = time (NULL);
656 }
657 
658 /* Returns a pointer to the lagged case from N_BEFORE cases before the
659    current one, or NULL if there haven't been that many cases yet. */
660 const struct ccase *
lagged_case(const struct dataset * ds,int n_before)661 lagged_case (const struct dataset *ds, int n_before)
662 {
663   assert (n_before >= 1);
664   assert (n_before <= ds->n_lag);
665 
666   if (n_before <= deque_count (&ds->lag))
667     return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
668   else
669     return NULL;
670 }
671 
672 /* Returns the current set of permanent transformations,
673    and clears the permanent transformations.
674    For use by INPUT PROGRAM. */
675 struct trns_chain *
proc_capture_transformations(struct dataset * ds)676 proc_capture_transformations (struct dataset *ds)
677 {
678   struct trns_chain *chain;
679 
680   assert (ds->temporary_trns_chain == NULL);
681   chain = ds->permanent_trns_chain;
682   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
683   dataset_transformations_changed__ (ds, false);
684 
685   return chain;
686 }
687 
688 /* Adds a transformation that processes a case with PROC and
689    frees itself with FREE to the current set of transformations.
690    The functions are passed AUX as auxiliary data. */
691 void
add_transformation(struct dataset * ds,trns_proc_func * proc,trns_free_func * free,void * aux)692 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
693 {
694   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
695   dataset_transformations_changed__ (ds, true);
696 }
697 
698 /* Adds a transformation that processes a case with PROC and
699    frees itself with FREE to the current set of transformations.
700    When parsing of the block of transformations is complete,
701    FINALIZE will be called.
702    The functions are passed AUX as auxiliary data. */
703 void
add_transformation_with_finalizer(struct dataset * ds,trns_finalize_func * finalize,trns_proc_func * proc,trns_free_func * free,void * aux)704 add_transformation_with_finalizer (struct dataset *ds,
705 				   trns_finalize_func *finalize,
706                                    trns_proc_func *proc,
707                                    trns_free_func *free, void *aux)
708 {
709   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
710   dataset_transformations_changed__ (ds, true);
711 }
712 
713 /* Returns the index of the next transformation.
714    This value can be returned by a transformation procedure
715    function to indicate a "jump" to that transformation. */
716 size_t
next_transformation(const struct dataset * ds)717 next_transformation (const struct dataset *ds)
718 {
719   return trns_chain_next (ds->cur_trns_chain);
720 }
721 
722 /* Returns true if the next call to add_transformation() will add
723    a temporary transformation, false if it will add a permanent
724    transformation. */
725 bool
proc_in_temporary_transformations(const struct dataset * ds)726 proc_in_temporary_transformations (const struct dataset *ds)
727 {
728   return ds->temporary_trns_chain != NULL;
729 }
730 
731 /* Marks the start of temporary transformations.
732    Further calls to add_transformation() will add temporary
733    transformations. */
734 void
proc_start_temporary_transformations(struct dataset * ds)735 proc_start_temporary_transformations (struct dataset *ds)
736 {
737   if (!proc_in_temporary_transformations (ds))
738     {
739       add_case_limit_trns (ds);
740 
741       ds->permanent_dict = dict_clone (ds->dict);
742 
743       trns_chain_finalize (ds->permanent_trns_chain);
744       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
745       dataset_transformations_changed__ (ds, true);
746     }
747 }
748 
749 /* Converts all the temporary transformations, if any, to permanent
750    transformations.  Further transformations will be permanent.
751 
752    The FILTER command is implemented as a temporary transformation, so a
753    procedure that uses this function should usually use proc_open_filtering()
754    with FILTER false, instead of plain proc_open().
755 
756    Returns true if anything changed, false otherwise. */
757 bool
proc_make_temporary_transformations_permanent(struct dataset * ds)758 proc_make_temporary_transformations_permanent (struct dataset *ds)
759 {
760   if (proc_in_temporary_transformations (ds))
761     {
762       trns_chain_finalize (ds->temporary_trns_chain);
763       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
764       ds->temporary_trns_chain = NULL;
765 
766       ds->cur_trns_chain = ds->permanent_trns_chain;
767 
768       dict_unref (ds->permanent_dict);
769       ds->permanent_dict = NULL;
770 
771       return true;
772     }
773   else
774     return false;
775 }
776 
777 /* Cancels all temporary transformations, if any.  Further
778    transformations will be permanent.
779    Returns true if anything changed, false otherwise. */
780 bool
proc_cancel_temporary_transformations(struct dataset * ds)781 proc_cancel_temporary_transformations (struct dataset *ds)
782 {
783   if (proc_in_temporary_transformations (ds))
784     {
785       dict_unref (ds->dict);
786       ds->dict = ds->permanent_dict;
787       ds->permanent_dict = NULL;
788 
789       trns_chain_destroy (ds->temporary_trns_chain);
790       ds->temporary_trns_chain = NULL;
791       dataset_transformations_changed__ (
792         ds, !trns_chain_is_empty (ds->permanent_trns_chain));
793       return true;
794     }
795   else
796     return false;
797 }
798 
799 /* Cancels all transformations, if any.
800    Returns true if successful, false on I/O error. */
801 bool
proc_cancel_all_transformations(struct dataset * ds)802 proc_cancel_all_transformations (struct dataset *ds)
803 {
804   bool ok;
805   assert (ds->proc_state == PROC_COMMITTED);
806   ok = trns_chain_destroy (ds->permanent_trns_chain);
807   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
808   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
809   ds->temporary_trns_chain = NULL;
810   dataset_transformations_changed__ (ds, false);
811 
812   return ok;
813 }
814 
815 static int
store_case_num(void * var_,struct ccase ** cc,casenumber case_num)816 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
817 {
818   struct variable *var = var_;
819 
820   *cc = case_unshare (*cc);
821   case_data_rw (*cc, var)->f = case_num;
822 
823   return TRNS_CONTINUE;
824 }
825 
826 /* Add a variable which we can sort by to get back the original order. */
827 struct variable *
add_permanent_ordering_transformation(struct dataset * ds)828 add_permanent_ordering_transformation (struct dataset *ds)
829 {
830   struct variable *temp_var;
831 
832   temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
833   if (proc_in_temporary_transformations (ds))
834     {
835       struct variable *perm_var;
836 
837       perm_var = dict_clone_var_in_place_assert (ds->permanent_dict, temp_var);
838       trns_chain_append (ds->permanent_trns_chain, NULL, store_case_num,
839                          NULL, perm_var);
840       trns_chain_finalize (ds->permanent_trns_chain);
841     }
842   else
843     add_transformation (ds, store_case_num, NULL, temp_var);
844 
845   return temp_var;
846 }
847 
848 /* Causes output from the next procedure to be discarded, instead
849    of being preserved for use as input for the next procedure. */
850 void
proc_discard_output(struct dataset * ds)851 proc_discard_output (struct dataset *ds)
852 {
853   ds->discard_output = true;
854 }
855 
856 
857 /* Checks whether DS has a corrupted active dataset.  If so,
858    discards it and returns false.  If not, returns true without
859    doing anything. */
860 bool
dataset_end_of_command(struct dataset * ds)861 dataset_end_of_command (struct dataset *ds)
862 {
863   if (ds->source != NULL)
864     {
865       if (casereader_error (ds->source))
866         {
867           dataset_clear (ds);
868           return false;
869         }
870       else
871         {
872           const struct taint *taint = casereader_get_taint (ds->source);
873           taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
874           assert (!taint_has_tainted_successor (taint));
875         }
876     }
877   return true;
878 }
879 
880 static trns_proc_func case_limit_trns_proc;
881 static trns_free_func case_limit_trns_free;
882 
883 /* Adds a transformation that limits the number of cases that may
884    pass through, if DS->DICT has a case limit. */
885 static void
add_case_limit_trns(struct dataset * ds)886 add_case_limit_trns (struct dataset *ds)
887 {
888   casenumber case_limit = dict_get_case_limit (ds->dict);
889   if (case_limit != 0)
890     {
891       casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
892       *cases_remaining = case_limit;
893       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
894                           cases_remaining);
895       dict_set_case_limit (ds->dict, 0);
896     }
897 }
898 
899 /* Limits the maximum number of cases processed to
900    *CASES_REMAINING. */
901 static int
case_limit_trns_proc(void * cases_remaining_,struct ccase ** c UNUSED,casenumber case_nr UNUSED)902 case_limit_trns_proc (void *cases_remaining_,
903                       struct ccase **c UNUSED, casenumber case_nr UNUSED)
904 {
905   size_t *cases_remaining = cases_remaining_;
906   if (*cases_remaining > 0)
907     {
908       (*cases_remaining)--;
909       return TRNS_CONTINUE;
910     }
911   else
912     return TRNS_DROP_CASE;
913 }
914 
915 /* Frees the data associated with a case limit transformation. */
916 static bool
case_limit_trns_free(void * cases_remaining_)917 case_limit_trns_free (void *cases_remaining_)
918 {
919   size_t *cases_remaining = cases_remaining_;
920   free (cases_remaining);
921   return true;
922 }
923 
924 static trns_proc_func filter_trns_proc;
925 
926 /* Adds a temporary transformation to filter data according to
927    the variable specified on FILTER, if any. */
928 static void
add_filter_trns(struct dataset * ds)929 add_filter_trns (struct dataset *ds)
930 {
931   struct variable *filter_var = dict_get_filter (ds->dict);
932   if (filter_var != NULL)
933     {
934       proc_start_temporary_transformations (ds);
935       add_transformation (ds, filter_trns_proc, NULL, filter_var);
936     }
937 }
938 
939 /* FILTER transformation. */
940 static int
filter_trns_proc(void * filter_var_,struct ccase ** c,casenumber case_nr UNUSED)941 filter_trns_proc (void *filter_var_,
942                   struct ccase **c, casenumber case_nr UNUSED)
943 
944 {
945   struct variable *filter_var = filter_var_;
946   double f = case_num (*c, filter_var);
947   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
948           ? TRNS_CONTINUE : TRNS_DROP_CASE);
949 }
950 
951 
952 void
dataset_need_lag(struct dataset * ds,int n_before)953 dataset_need_lag (struct dataset *ds, int n_before)
954 {
955   ds->n_lag = MAX (ds->n_lag, n_before);
956 }
957 
958 static void
dataset_changed__(struct dataset * ds)959 dataset_changed__ (struct dataset *ds)
960 {
961   if (ds->callbacks != NULL && ds->callbacks->changed != NULL)
962     ds->callbacks->changed (ds->cb_data);
963 }
964 
965 static void
dataset_transformations_changed__(struct dataset * ds,bool non_empty)966 dataset_transformations_changed__ (struct dataset *ds, bool non_empty)
967 {
968   if (ds->callbacks != NULL && ds->callbacks->transformations_changed != NULL)
969     ds->callbacks->transformations_changed (non_empty, ds->cb_data);
970 }
971 
972 /* Private interface for use by session code. */
973 
974 void
dataset_set_session__(struct dataset * ds,struct session * session)975 dataset_set_session__ (struct dataset *ds, struct session *session)
976 {
977   ds->session = session;
978 }
979