1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2011 Free Software Foundation, Inc.
3 
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16 
17 #include <config.h>
18 
19 #include <stdlib.h>
20 
21 #include "data/casereader-provider.h"
22 #include "data/casereader.h"
23 #include "data/val-type.h"
24 #include "data/variable.h"
25 #include "libpspp/taint.h"
26 
27 #include "gl/xalloc.h"
28 
29 /* Casereader that applies a user-supplied function to translate
30    each case into another in an arbitrary fashion. */
31 
32 /* A translating casereader. */
33 struct casereader_translator
34   {
35     struct casereader *subreader; /* Source of input cases. */
36 
37     struct ccase *(*translate) (struct ccase *input, void *aux);
38     bool (*destroy) (void *aux);
39     void *aux;
40   };
41 
42 static const struct casereader_class casereader_translator_class;
43 
44 /* Creates and returns a new casereader whose cases are produced
45    by reading from SUBREADER and passing through TRANSLATE, which
46    must return the translated case, and populate it based on
47    INPUT and auxiliary data AUX.  TRANSLATE must destroy its
48    input case.
49 
50    TRANSLATE may be stateful, that is, the output for a given
51    case may depend on previous cases.  If TRANSLATE is stateless,
52    then you may want to use casereader_translate_stateless
53    instead, since it sometimes performs better.
54 
55    The cases returned by TRANSLATE must match OUTPUT_PROTO.
56 
57    When the translating casereader is destroyed, DESTROY will be
58    called to allow any state maintained by TRANSLATE to be freed.
59 
60    After this function is called, SUBREADER must not ever again
61    be referenced directly.  It will be destroyed automatically
62    when the translating casereader is destroyed. */
63 struct casereader *
casereader_create_translator(struct casereader * subreader,const struct caseproto * output_proto,struct ccase * (* translate)(struct ccase * input,void * aux),bool (* destroy)(void * aux),void * aux)64 casereader_create_translator (struct casereader *subreader,
65                               const struct caseproto *output_proto,
66                               struct ccase *(*translate) (struct ccase *input,
67                                                           void *aux),
68                               bool (*destroy) (void *aux),
69                               void *aux)
70 {
71   struct casereader_translator *ct = xmalloc (sizeof *ct);
72   struct casereader *reader;
73   ct->subreader = casereader_rename (subreader);
74   ct->translate = translate;
75   ct->destroy = destroy;
76   ct->aux = aux;
77   reader = casereader_create_sequential (
78     NULL, output_proto, casereader_get_case_cnt (ct->subreader),
79     &casereader_translator_class, ct);
80   taint_propagate (casereader_get_taint (ct->subreader),
81                    casereader_get_taint (reader));
82   return reader;
83 }
84 
85 /* Internal read function for translating casereader. */
86 static struct ccase *
casereader_translator_read(struct casereader * reader UNUSED,void * ct_)87 casereader_translator_read (struct casereader *reader UNUSED,
88                             void *ct_)
89 {
90   struct casereader_translator *ct = ct_;
91   struct ccase *tmp = casereader_read (ct->subreader);
92   if (tmp)
93     tmp = ct->translate (tmp, ct->aux);
94   return tmp;
95 }
96 
97 /* Internal destroy function for translating casereader. */
98 static void
casereader_translator_destroy(struct casereader * reader UNUSED,void * ct_)99 casereader_translator_destroy (struct casereader *reader UNUSED, void *ct_)
100 {
101   struct casereader_translator *ct = ct_;
102   casereader_destroy (ct->subreader);
103   ct->destroy (ct->aux);
104   free (ct);
105 }
106 
107 /* Casereader class for translating casereader. */
108 static const struct casereader_class casereader_translator_class =
109   {
110     casereader_translator_read,
111     casereader_translator_destroy,
112     NULL,
113     NULL,
114   };
115 
116 /* Casereader that applies a user-supplied function to translate
117    each case into another in a stateless fashion. */
118 
119 /* A statelessly translating casereader. */
120 struct casereader_stateless_translator
121   {
122     struct casereader *subreader; /* Source of input cases. */
123 
124     casenumber case_offset;
125     struct ccase *(*translate) (struct ccase *input, casenumber,
126                                 const void *aux);
127     bool (*destroy) (void *aux);
128     void *aux;
129   };
130 
131 static const struct casereader_random_class
132 casereader_stateless_translator_class;
133 
134 /* Creates and returns a new casereader whose cases are produced by reading
135    from SUBREADER and passing through the TRANSLATE function.  TRANSLATE must
136    takes ownership of its input case and returns a translated case, populating
137    the translated case based on INPUT and auxiliary data AUX.
138 
139    TRANSLATE must be stateless, that is, the output for a given case must not
140    depend on previous cases.  This is because cases may be retrieved in
141    arbitrary order, and some cases may be retrieved multiple times, and some
142    cases may be skipped and never retrieved at all.  If TRANSLATE is stateful,
143    use casereader_create_translator instead.
144 
145    The casenumber argument to the TRANSLATE function is the absolute case
146    number in SUBREADER, that is, 0 when the first case in SUBREADER is being
147    translated, 1 when the second case is being translated, and so on.
148 
149    The cases returned by TRANSLATE must match OUTPUT_PROTO.
150 
151    When the stateless translating casereader is destroyed, DESTROY will be
152    called to allow any auxiliary data maintained by TRANSLATE to be freed.
153 
154    After this function is called, SUBREADER must not ever again be referenced
155    directly.  It will be destroyed automatically when the translating
156    casereader is destroyed. */
157 struct casereader *
casereader_translate_stateless(struct casereader * subreader,const struct caseproto * output_proto,struct ccase * (* translate)(struct ccase * input,casenumber,const void * aux),bool (* destroy)(void * aux),void * aux)158 casereader_translate_stateless (
159   struct casereader *subreader,
160   const struct caseproto *output_proto,
161   struct ccase *(*translate) (struct ccase *input, casenumber,
162                               const void *aux),
163   bool (*destroy) (void *aux),
164   void *aux)
165 {
166   struct casereader_stateless_translator *cst = xmalloc (sizeof *cst);
167   struct casereader *reader;
168   cst->subreader = casereader_rename (subreader);
169   cst->translate = translate;
170   cst->destroy = destroy;
171   cst->aux = aux;
172   reader = casereader_create_random (
173     output_proto, casereader_get_case_cnt (cst->subreader),
174     &casereader_stateless_translator_class, cst);
175   taint_propagate (casereader_get_taint (cst->subreader),
176                    casereader_get_taint (reader));
177   return reader;
178 }
179 
180 /* Internal read function for stateless translating casereader. */
181 static struct ccase *
casereader_stateless_translator_read(struct casereader * reader UNUSED,void * cst_,casenumber idx)182 casereader_stateless_translator_read (struct casereader *reader UNUSED,
183                                       void *cst_, casenumber idx)
184 {
185   struct casereader_stateless_translator *cst = cst_;
186   struct ccase *tmp = casereader_peek (cst->subreader, idx);
187   if (tmp != NULL)
188     tmp = cst->translate (tmp, cst->case_offset + idx, cst->aux);
189   return tmp;
190 }
191 
192 /* Internal destroy function for translating casereader. */
193 static void
casereader_stateless_translator_destroy(struct casereader * reader UNUSED,void * cst_)194 casereader_stateless_translator_destroy (struct casereader *reader UNUSED,
195                                          void *cst_)
196 {
197   struct casereader_stateless_translator *cst = cst_;
198   casereader_destroy (cst->subreader);
199   cst->destroy (cst->aux);
200   free (cst);
201 }
202 
203 static void
casereader_stateless_translator_advance(struct casereader * reader UNUSED,void * cst_,casenumber cnt)204 casereader_stateless_translator_advance (struct casereader *reader UNUSED,
205                                          void *cst_, casenumber cnt)
206 {
207   struct casereader_stateless_translator *cst = cst_;
208   cst->case_offset += casereader_advance (cst->subreader, cnt);
209 }
210 
211 /* Casereader class for stateless translating casereader. */
212 static const struct casereader_random_class
213 casereader_stateless_translator_class =
214   {
215     casereader_stateless_translator_read,
216     casereader_stateless_translator_destroy,
217     casereader_stateless_translator_advance,
218   };
219 
220 
221 struct casereader_append_numeric
222 {
223   struct caseproto *proto;
224   casenumber n;
225   new_value_func *func;
226   void *aux;
227   void (*destroy) (void *aux);
228 };
229 
230 static bool can_destroy (void *can_);
231 
232 static struct ccase *can_translate (struct ccase *, void *can_);
233 
234 /* Creates and returns a new casereader whose cases are produced
235    by reading from SUBREADER and appending an additional value,
236    generated by FUNC.  AUX is an optional parameter which
237    gets passed to FUNC. FUNC will also receive N as it, which is
238    the ordinal number of the case in the reader.  DESTROY is an
239    optional parameter used to destroy AUX.
240 
241    After this function is called, SUBREADER must not ever again
242    be referenced directly.  It will be destroyed automatically
243    when the translating casereader is destroyed. */
244 struct casereader *
casereader_create_append_numeric(struct casereader * subreader,new_value_func func,void * aux,void (* destroy)(void * aux))245 casereader_create_append_numeric (struct casereader *subreader,
246 				  new_value_func func, void *aux,
247 				  void (*destroy) (void *aux))
248 {
249   struct casereader_append_numeric *can = xmalloc (sizeof *can);
250   can->proto = caseproto_ref (casereader_get_proto (subreader));
251   can->proto = caseproto_add_width (can->proto, 0);
252   can->n = 0;
253   can->aux = aux;
254   can->func = func;
255   can->destroy = destroy;
256   return casereader_create_translator (subreader, can->proto,
257                                        can_translate, can_destroy, can);
258 }
259 
260 
261 static struct ccase *
can_translate(struct ccase * c,void * can_)262 can_translate (struct ccase *c, void *can_)
263 {
264   struct casereader_append_numeric *can = can_;
265   double new_value = can->func (c, can->n++, can->aux);
266   c = case_unshare_and_resize (c, can->proto);
267   case_data_rw_idx (c, caseproto_get_n_widths (can->proto) - 1)->f = new_value;
268   return c;
269 }
270 
271 static bool
can_destroy(void * can_)272 can_destroy (void *can_)
273 {
274   struct casereader_append_numeric *can = can_;
275   if (can->destroy)
276     can->destroy (can->aux);
277   caseproto_unref (can->proto);
278   free (can);
279   return true;
280 }
281 
282 
283 
284 struct arithmetic_sequence
285 {
286   double first;
287   double increment;
288 };
289 
290 static double
next_arithmetic(const struct ccase * c UNUSED,casenumber n,void * aux)291 next_arithmetic (const struct ccase *c UNUSED,
292 		 casenumber n,
293 		 void *aux)
294 {
295   struct arithmetic_sequence *as = aux;
296   return n * as->increment + as->first;
297 }
298 
299 /* Creates and returns a new casereader whose cases are produced
300    by reading from SUBREADER and appending an additional value,
301    which takes the value FIRST in the first case, FIRST +
302    INCREMENT in the second case, FIRST + INCREMENT * 2 in the
303    third case, and so on.
304 
305    After this function is called, SUBREADER must not ever again
306    be referenced directly.  It will be destroyed automatically
307    when the translating casereader is destroyed. */
308 struct casereader *
casereader_create_arithmetic_sequence(struct casereader * subreader,double first,double increment)309 casereader_create_arithmetic_sequence (struct casereader *subreader,
310                                        double first, double increment)
311 {
312   struct arithmetic_sequence *as = xzalloc (sizeof *as);
313   as->first = first;
314   as->increment = increment;
315   return casereader_create_append_numeric (subreader, next_arithmetic,
316 					   as, free);
317 }
318 
319 
320 
321 
322 struct casereader_append_rank
323 {
324   struct casereader *clone;
325   casenumber n;
326   const struct variable *var;
327   const struct variable *weight;
328   struct caseproto *proto;
329   casenumber n_common;
330   double mean_rank;
331   double cc;
332   distinct_func *distinct;
333   void *aux;
334   enum rank_error *err;
335   double prev_value;
336 };
337 
338 static bool car_destroy (void *car_);
339 
340 static struct ccase *car_translate (struct ccase *input, void *car_);
341 
342 /* Creates and returns a new casereader whose cases are produced
343    by reading from SUBREADER and appending an additional value,
344    which is the rank of the observation.   W is the weight variable
345    of the dictionary containing V, or NULL if there is no weight
346    variable.
347 
348    The following preconditions must be met:
349 
350    1.    SUBREADER must be sorted on V.
351 
352    2.    The weight variables, must be non-negative.
353 
354    If either of these preconditions are not satisfied, then the rank
355    variables may not be correct.  In this case, if ERR is non-null,
356    it will be set according to the erroneous conditions encountered.
357 
358    If DISTINCT_CALLBACK is non-null, then  it will be called exactly
359    once for every case containing a distinct value of V.  AUX is
360    an auxiliary pointer passed to DISTINCT_CALLBACK.
361 
362    After this function is called, SUBREADER must not ever again
363    be referenced directly.  It will be destroyed automatically
364    when the translating casereader is destroyed. */
365 struct casereader *
casereader_create_append_rank(struct casereader * subreader,const struct variable * v,const struct variable * w,enum rank_error * err,distinct_func * distinct_callback,void * aux)366 casereader_create_append_rank (struct casereader *subreader,
367 			       const struct variable *v,
368 			       const struct variable *w,
369 			       enum rank_error *err,
370 			       distinct_func *distinct_callback,
371 			       void *aux
372 			)
373 {
374   struct casereader_append_rank *car = xmalloc (sizeof *car);
375   car->proto = caseproto_ref (casereader_get_proto (subreader));
376   car->proto = caseproto_add_width (car->proto, 0);
377   car->weight = w;
378   car->var = v;
379   car->n = 0;
380   car->n_common = 1;
381   car->cc = 0.0;
382   car->clone = casereader_clone (subreader);
383   car->distinct = distinct_callback;
384   car->aux = aux;
385   car->err = err;
386   car->prev_value = SYSMIS;
387 
388   return casereader_create_translator (subreader, car->proto,
389                                        car_translate, car_destroy, car);
390 }
391 
392 
393 static bool
car_destroy(void * car_)394 car_destroy (void *car_)
395 {
396   struct casereader_append_rank *car = car_;
397   casereader_destroy (car->clone);
398   caseproto_unref (car->proto);
399   free (car);
400   return true;
401 }
402 
403 static struct ccase *
car_translate(struct ccase * input,void * car_)404 car_translate (struct ccase *input, void *car_)
405 {
406   struct casereader_append_rank *car = car_;
407 
408   const double value = case_data (input, car->var)->f;
409 
410   if (car->prev_value != SYSMIS)
411     {
412       if (car->err && value < car->prev_value)
413 	*car->err |= RANK_ERR_UNSORTED;
414     }
415 
416   if (car->n_common == 1)
417     {
418       double vxx = SYSMIS;
419       casenumber k = 0;
420       double weight = 1.0;
421       if (car->weight)
422 	{
423 	  weight = case_data (input, car->weight)->f;
424 	  if (car->err && weight < 0)
425 	    *car->err |= RANK_ERR_NEGATIVE_WEIGHT;
426 	}
427 
428       do
429 	{
430 	  struct ccase *c = casereader_peek (car->clone, car->n + ++k);
431 	  if (c == NULL)
432 	    break;
433 	  vxx = case_data (c, car->var)->f;
434 
435 	  if (vxx == value)
436 	    {
437 	      if (car->weight)
438 		{
439 		  double w = case_data (c, car->weight)->f;
440 
441 		  if (car->err && w < 0)
442 		    *car->err |= RANK_ERR_NEGATIVE_WEIGHT;
443 
444 		  weight += w;
445 		}
446 	      else
447 		weight += 1.0;
448 	      car->n_common++;
449 	    }
450           case_unref (c);
451 	}
452       while (vxx == value);
453       car->mean_rank = car->cc + (weight + 1) / 2.0;
454       car->cc += weight;
455 
456       if (car->distinct)
457 	car->distinct (value, car->n_common, weight, car->aux);
458     }
459   else
460     car->n_common--;
461 
462   car->n++;
463 
464   input = case_unshare_and_resize (input, car->proto);
465   case_data_rw_idx (input, caseproto_get_n_widths (car->proto) - 1)->f
466     = car->mean_rank;
467   car->prev_value = value;
468   return input;
469 }
470 
471 
472 
473 
474 struct consolidator
475 {
476   const struct variable *key;
477   const struct variable *weight;
478   double cc;
479   double prev_cc;
480 
481   casenumber n;
482   struct casereader *clone;
483   struct caseproto *proto;
484   int direction;
485 };
486 
487 static bool
uniquify(const struct ccase * c,void * aux)488 uniquify (const struct ccase *c, void *aux)
489 {
490   struct consolidator *cdr = aux;
491   const union value *current_value = case_data (c, cdr->key);
492   const int key_width = var_get_width (cdr->key);
493   const double weight = cdr->weight ? case_data (c, cdr->weight)->f : 1.0;
494   struct ccase *next_case = casereader_peek (cdr->clone, cdr->n + 1);
495   int dir = 0;
496 
497   cdr->n ++;
498   cdr->cc += weight;
499 
500   if (NULL == next_case)
501       goto end;
502 
503   dir = value_compare_3way (case_data (next_case, cdr->key),
504 			    current_value, key_width);
505   case_unref (next_case);
506   if (dir != 0)
507     {
508       /* Insist that the data are sorted */
509       assert (cdr->direction == 0 || dir == cdr->direction);
510       cdr->direction = dir;
511       goto end;
512     }
513 
514   return false;
515 
516  end:
517   cdr->prev_cc = cdr->cc;
518   cdr->cc = 0;
519   return true;
520 }
521 
522 
523 
524 static struct ccase *
consolodate_weight(struct ccase * input,void * aux)525 consolodate_weight (struct ccase *input, void *aux)
526 {
527   struct consolidator *cdr = aux;
528   struct ccase *c;
529 
530   if (cdr->weight)
531     {
532       c = case_unshare (input);
533       case_data_rw (c, cdr->weight)->f = cdr->prev_cc;
534     }
535   else
536     {
537       c = case_unshare_and_resize (input, cdr->proto);
538       case_data_rw_idx (c, caseproto_get_n_widths (cdr->proto) - 1)->f = cdr->prev_cc;
539     }
540 
541   return c;
542 }
543 
544 
545 static bool
uniquify_destroy(void * aux)546 uniquify_destroy (void *aux)
547 {
548   struct consolidator *cdr = aux;
549 
550   casereader_destroy (cdr->clone);
551   caseproto_unref (cdr->proto);
552   free (cdr);
553 
554   return true;
555 }
556 
557 
558 
559 /* Returns a new casereader which is based upon INPUT, but which contains a maximum
560    of one case for each distinct value of KEY.
561    If WEIGHT is non-null, then the new casereader's values for this variable
562    will be the sum of all values matching KEY.
563    IF WEIGHT is null, then the new casereader will have an additional numeric
564    value appended, which will contain the total number of cases containing
565    KEY.
566    INPUT must be sorted on KEY
567 */
568 struct casereader *
casereader_create_distinct(struct casereader * input,const struct variable * key,const struct variable * weight)569 casereader_create_distinct (struct casereader *input,
570 					       const struct variable *key,
571 					       const struct variable *weight)
572 {
573   struct casereader *u ;
574   struct casereader *ud ;
575   struct caseproto *output_proto = caseproto_ref (casereader_get_proto (input));
576 
577   struct consolidator *cdr = xmalloc (sizeof (*cdr));
578   cdr->n = 0;
579   cdr->key = key;
580   cdr->weight = weight;
581   cdr->cc = 0;
582   cdr->clone = casereader_clone (input);
583   cdr->direction = 0;
584 
585   if (NULL == cdr->weight)
586     output_proto = caseproto_add_width (output_proto, 0);
587 
588   cdr->proto = output_proto;
589 
590   u = casereader_create_filter_func (input, uniquify,
591 				     NULL, cdr, NULL);
592 
593   ud = casereader_create_translator (u,
594 				     output_proto,
595 				     consolodate_weight,
596 				     uniquify_destroy,
597 				     cdr);
598 
599   return ud;
600 }
601 
602