1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2011 Free Software Foundation, Inc.
3 
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16 
17 #include <config.h>
18 
19 #include "data/casereader.h"
20 
21 #include <stdlib.h>
22 
23 #include "data/casereader-provider.h"
24 #include "data/casewriter.h"
25 #include "data/variable.h"
26 #include "data/dictionary.h"
27 #include "libpspp/taint.h"
28 #include "libpspp/message.h"
29 
30 #include "gl/xalloc.h"
31 
32 #include "gettext.h"
33 #define _(msgid) gettext (msgid)
34 
35 /* A casereader that filters data coming from another
36    casereader. */
37 struct casereader_filter
38   {
39     struct casereader *subreader; /* The reader to filter. */
40     bool (*include) (const struct ccase *, void *aux);
41     bool (*destroy) (void *aux);
42     void *aux;
43     struct casewriter *exclude; /* Writer that gets filtered cases, or NULL. */
44   };
45 
46 static const struct casereader_class casereader_filter_class;
47 
48 /* Creates and returns a casereader whose content is a filtered
49    version of the data in SUBREADER.  Only the cases for which
50    INCLUDE returns true will appear in the returned casereader,
51    in the original order.
52 
53    If EXCLUDE is non-null, then cases for which INCLUDE returns
54    false are written to EXCLUDE.  These cases will not
55    necessarily be fully written to EXCLUDE until the filtering casereader's
56    cases have been fully read or, if that never occurs, until the
57    filtering casereader is destroyed.
58 
59    When the filtering casereader is destroyed, DESTROY will be
60    called to allow any state maintained by INCLUDE to be freed.
61 
62    After this function is called, SUBREADER must not ever again
63    be referenced directly.  It will be destroyed automatically
64    when the filtering casereader is destroyed. */
65 struct casereader *
casereader_create_filter_func(struct casereader * subreader,bool (* include)(const struct ccase *,void * aux),bool (* destroy)(void * aux),void * aux,struct casewriter * exclude)66 casereader_create_filter_func (struct casereader *subreader,
67                                bool (*include) (const struct ccase *,
68                                                 void *aux),
69                                bool (*destroy) (void *aux),
70                                void *aux,
71                                struct casewriter *exclude)
72 {
73   struct casereader_filter *filter = xmalloc (sizeof *filter);
74   struct casereader *reader;
75   filter->subreader = casereader_rename (subreader);
76   filter->include = include;
77   filter->destroy = destroy;
78   filter->aux = aux;
79   filter->exclude = exclude;
80   reader = casereader_create_sequential (
81     NULL, casereader_get_proto (filter->subreader), CASENUMBER_MAX,
82     &casereader_filter_class, filter);
83   taint_propagate (casereader_get_taint (filter->subreader),
84                    casereader_get_taint (reader));
85   return reader;
86 }
87 
88 /* Internal read function for filtering casereader. */
89 static struct ccase *
casereader_filter_read(struct casereader * reader UNUSED,void * filter_)90 casereader_filter_read (struct casereader *reader UNUSED, void *filter_)
91 
92 {
93   struct casereader_filter *filter = filter_;
94   for (;;)
95     {
96       struct ccase *c = casereader_read (filter->subreader);
97       if (c == NULL)
98         return NULL;
99       else if (filter->include (c, filter->aux))
100         return c;
101       else if (filter->exclude != NULL)
102         casewriter_write (filter->exclude, c);
103       else
104         case_unref (c);
105     }
106 }
107 
108 /* Internal destruction function for filtering casereader. */
109 static void
casereader_filter_destroy(struct casereader * reader,void * filter_)110 casereader_filter_destroy (struct casereader *reader, void *filter_)
111 {
112   struct casereader_filter *filter = filter_;
113 
114   /* Make sure we've written everything to the excluded cases
115      casewriter, if there is one. */
116   if (filter->exclude != NULL)
117     {
118       struct ccase *c;
119       while ((c = casereader_read (filter->subreader)) != NULL)
120         if (filter->include (c, filter->aux))
121           case_unref (c);
122         else
123           casewriter_write (filter->exclude, c);
124     }
125 
126   casereader_destroy (filter->subreader);
127   if (filter->destroy != NULL && !filter->destroy (filter->aux))
128     casereader_force_error (reader);
129   free (filter);
130 }
131 
132 /* Filtering casereader class. */
133 static const struct casereader_class casereader_filter_class =
134   {
135     casereader_filter_read,
136     casereader_filter_destroy,
137 
138     /* We could in fact delegate clone to the subreader, if the
139        filter function is required to have no memory and if we
140        added reference counting.  But it might be useful to have
141        filter functions with memory and in any case this would
142        require a little extra work. */
143     NULL,
144     NULL,
145   };
146 
147 
148 /* Casereader for filtering valid weights. */
149 
150 /* Weight-filtering data. */
151 struct casereader_filter_weight
152   {
153     const struct variable *weight_var; /* Weight variable. */
154     bool *warn_on_invalid;      /* Have we already issued an error? */
155     bool local_warn_on_invalid; /* warn_on_invalid might point here. */
156   };
157 
158 static bool casereader_filter_weight_include (const struct ccase *, void *);
159 static bool casereader_filter_weight_destroy (void *);
160 
161 /* Creates and returns a casereader that filters cases from
162    READER by valid weights, that is, any cases with user- or
163    system-missing, zero, or negative weights are dropped.  The
164    weight variable's information is taken from DICT.  If DICT
165    does not have a weight variable, then no cases are filtered
166    out.
167 
168    When a case with an invalid weight is encountered,
169    *WARN_ON_INVALID is checked.  If it is true, then an error
170    message is issued and *WARN_ON_INVALID is set false.  If
171    WARN_ON_INVALID is a null pointer, then an internal bool that
172    is initially true is used instead of a caller-supplied bool.
173 
174    If EXCLUDE is non-null, then dropped cases are written to
175    EXCLUDE.  These cases will not necessarily be fully written to
176    EXCLUDE until the filtering casereader's cases have been fully
177    read or, if that never occurs, until the filtering casereader
178    is destroyed.
179 
180    After this function is called, READER must not ever again be
181    referenced directly.  It will be destroyed automatically when
182    the filtering casereader is destroyed. */
183 struct casereader *
casereader_create_filter_weight(struct casereader * reader,const struct dictionary * dict,bool * warn_on_invalid,struct casewriter * exclude)184 casereader_create_filter_weight (struct casereader *reader,
185                                  const struct dictionary *dict,
186                                  bool *warn_on_invalid,
187                                  struct casewriter *exclude)
188 {
189   struct variable *weight_var = dict_get_weight (dict);
190   if (weight_var != NULL)
191     {
192       struct casereader_filter_weight *cfw = xmalloc (sizeof *cfw);
193       cfw->weight_var = weight_var;
194       cfw->warn_on_invalid = (warn_on_invalid
195                                ? warn_on_invalid
196                                : &cfw->local_warn_on_invalid);
197       cfw->local_warn_on_invalid = true;
198       reader = casereader_create_filter_func (reader,
199                                               casereader_filter_weight_include,
200                                               casereader_filter_weight_destroy,
201                                               cfw, exclude);
202     }
203   else
204     reader = casereader_rename (reader);
205   return reader;
206 }
207 
208 /* Internal "include" function for weight-filtering
209    casereader. */
210 static bool
casereader_filter_weight_include(const struct ccase * c,void * cfw_)211 casereader_filter_weight_include (const struct ccase *c, void *cfw_)
212 {
213   struct casereader_filter_weight *cfw = cfw_;
214   double value = case_num (c, cfw->weight_var);
215   if (value >= 0.0 && !var_is_num_missing (cfw->weight_var, value, MV_ANY))
216     return true;
217   else
218     {
219       if (*cfw->warn_on_invalid)
220         {
221 	  msg (SW, _("At least one case in the data read had a weight value "
222 		     "that was user-missing, system-missing, zero, or "
223 		     "negative.  These case(s) were ignored."));
224           *cfw->warn_on_invalid = false;
225         }
226       return false;
227     }
228 }
229 
230 /* Internal "destroy" function for weight-filtering
231    casereader. */
232 static bool
casereader_filter_weight_destroy(void * cfw_)233 casereader_filter_weight_destroy (void *cfw_)
234 {
235   struct casereader_filter_weight *cfw = cfw_;
236   free (cfw);
237   return true;
238 }
239 
240 /* Casereader for filtering missing values. */
241 
242 /* Missing-value filtering data. */
243 struct casereader_filter_missing
244   {
245     struct variable **vars;     /* Variables whose values to filter. */
246     size_t var_cnt;             /* Number of variables. */
247     enum mv_class class;        /* Types of missing values to filter. */
248     casenumber *n_missing;
249   };
250 
251 static bool casereader_filter_missing_include (const struct ccase *, void *);
252 static bool casereader_filter_missing_destroy (void *);
253 
254 /* Creates and returns a casereader that filters out cases from
255    READER that have a missing value in the given CLASS for any of
256    the VAR_CNT variables in VARS.  Only cases that have
257    non-missing values for all of these variables are passed
258    through.
259 
260    Ownership of VARS is retained by the caller.
261 
262    If EXCLUDE is non-null, then dropped cases are written to
263    EXCLUDE.  These cases will not necessarily be fully written to
264    EXCLUDE until the filtering casereader's cases have been fully
265    read or, if that never occurs, until the filtering casereader
266    is destroyed.
267 
268    If N_MISSING is non-null, then after reading, it will be filled
269    with the total number of dropped cases.
270 
271    After this function is called, READER must not ever again
272    be referenced directly.  It will be destroyed automatically
273    when the filtering casereader is destroyed. */
274 struct casereader *
casereader_create_filter_missing(struct casereader * reader,const struct variable * const * vars,size_t var_cnt,enum mv_class class,casenumber * n_missing,struct casewriter * exclude)275 casereader_create_filter_missing (struct casereader *reader,
276                                   const struct variable *const*vars, size_t var_cnt,
277                                   enum mv_class class,
278 				  casenumber *n_missing,
279                                   struct casewriter *exclude)
280 {
281   if (var_cnt > 0 && class != MV_NEVER)
282     {
283       struct casereader_filter_missing *cfm = xmalloc (sizeof *cfm);
284       cfm->vars = xmemdup (vars, sizeof *vars * var_cnt);
285       cfm->var_cnt = var_cnt;
286       cfm->class = class;
287       cfm->n_missing = n_missing;
288       if (n_missing) *n_missing = 0;
289       return casereader_create_filter_func (reader,
290                                             casereader_filter_missing_include,
291                                             casereader_filter_missing_destroy,
292                                             cfm,
293                                             exclude);
294     }
295   else
296     return casereader_rename (reader);
297 }
298 
299 /* Internal "include" function for missing value-filtering
300    casereader. */
301 static bool
casereader_filter_missing_include(const struct ccase * c,void * cfm_)302 casereader_filter_missing_include (const struct ccase *c, void *cfm_)
303 {
304   const struct casereader_filter_missing *cfm = cfm_;
305   size_t i;
306 
307   for (i = 0; i < cfm->var_cnt; i++)
308     {
309       struct variable *var = cfm->vars[i];
310       const union value *value = case_data (c, var);
311       if (var_is_value_missing (var, value, cfm->class))
312 	{
313 	  if (cfm->n_missing)
314 	    (*cfm->n_missing)++;
315 	  return false;
316 	}
317     }
318   return true;
319 }
320 
321 /* Internal "destroy" function for missing value-filtering
322    casereader. */
323 static bool
casereader_filter_missing_destroy(void * cfm_)324 casereader_filter_missing_destroy (void *cfm_)
325 {
326   struct casereader_filter_missing *cfm = cfm_;
327   free (cfm->vars);
328   free (cfm);
329   return true;
330 }
331 
332 /* Case-counting casereader. */
333 
334 static bool casereader_counter_include (const struct ccase *, void *);
335 
336 /* Creates and returns a new casereader that counts the number of
337    cases that have been read from it.  *COUNTER is initially set
338    to INITIAL_VALUE, then incremented by 1 each time a case is read.
339 
340    Counting casereaders must be used very cautiously: if a
341    counting casereader is cloned or if the casereader_peek
342    function is used on it, then the counter's value can be higher
343    than expected because of the buffering that goes on behind the
344    scenes.
345 
346    The counter is only incremented as cases are actually read
347    from the casereader.  In particular, if the casereader is
348    destroyed before all cases have been read from the casereader,
349    cases never read will not be included in the count.
350 
351    After this function is called, READER must not ever again
352    be referenced directly.  It will be destroyed automatically
353    when the filtering casereader is destroyed. */
354 struct casereader *
casereader_create_counter(struct casereader * reader,casenumber * counter,casenumber initial_value)355 casereader_create_counter (struct casereader *reader, casenumber *counter,
356                            casenumber initial_value)
357 {
358   *counter = initial_value;
359   return casereader_create_filter_func (reader, casereader_counter_include,
360                                         NULL, counter, NULL);
361 }
362 
363 /* Internal "include" function for counting casereader. */
364 static bool
casereader_counter_include(const struct ccase * c UNUSED,void * counter_)365 casereader_counter_include (const struct ccase *c UNUSED, void *counter_)
366 {
367   casenumber *counter = counter_;
368   ++*counter;
369   return true;
370 }
371