1#!/usr/local/bin/python3.8
2from __future__ import print_function
3import shutil
4import unittest
5import glob
6import random
7from ruffus import follows, split, mkdir, files, transform, suffix, posttask, touch_file, merge, Pipeline
8import sys
9import os
10
11
12
13NUMBER_OF_RANDOMS = 10000
14CHUNK_SIZE = 1000
15
16
17tempdir = os.path.relpath(os.path.abspath(os.path.splitext(__file__)[0])) + "/"
18
19# add grandparent to search path for testing
20grandparent_dir = os.path.abspath(
21    os.path.join(os.path.dirname(__file__), "..", ".."))
22sys.path.insert(0, grandparent_dir)
23
24# module name = script name without extension
25module_name = os.path.splitext(os.path.basename(__file__))[0]
26
27
28# ---------------------------------------------------------------
29#
30#   Create random numbers
31#
32@follows(mkdir(tempdir))
33@files(None, tempdir + "random_numbers.list")
34def create_random_numbers(input_file_name, output_file_name):
35    f = open(output_file_name, "w")
36    for i in range(NUMBER_OF_RANDOMS):
37        f.write("%g\n" % (random.random() * 100.0))
38    f.close()
39
40# ---------------------------------------------------------------
41#
42#   Split initial file
43#
44
45
46@follows(create_random_numbers)
47@split(tempdir + "random_numbers.list", tempdir + "*.chunks")
48def step_4_split_numbers_into_chunks(input_file_name, output_files):
49    """
50        Splits random numbers file into XXX files of CHUNK_SIZE each
51    """
52    #
53    #   clean up files from previous runs
54    #
55    for f in glob.glob("*.chunks"):
56        os.unlink(f)
57    #
58    #   create new file every CHUNK_SIZE lines and
59    #       copy each line into current file
60    #
61    output_file = None
62    cnt_files = 0
63    with open(input_file_name) as ii:
64        for i, line in enumerate(ii):
65            if i % CHUNK_SIZE == 0:
66                cnt_files += 1
67                if output_file:
68                    output_file.close()
69                output_file = open(tempdir + "%d.chunks" % cnt_files, "w")
70            output_file.write(line)
71    if output_file:
72        output_file.close()
73
74# ---------------------------------------------------------------
75#
76#   Calculate sum and sum of squares for each chunk file
77#
78
79
80@transform(step_4_split_numbers_into_chunks, suffix(".chunks"), ".sums")
81def step_5_calculate_sum_of_squares(input_file_name, output_file_name):
82    with open(output_file_name,  "w") as oo:
83        sum_squared, sum = [0.0, 0.0]
84        cnt_values = 0
85        with open(input_file_name) as ii:
86            for line in ii:
87                cnt_values += 1
88                val = float(line.rstrip())
89                sum_squared += val * val
90                sum += val
91        oo.write("%s\n%s\n%d\n" % (repr(sum_squared), repr(sum), cnt_values))
92
93
94def print_hooray_again():
95    print("     hooray again")
96
97
98def print_whoppee_again():
99    print("     whoppee again")
100
101
102# ---------------------------------------------------------------
103#
104#   Calculate sum and sum of squares for each chunk
105#
106@posttask(lambda: sys.stdout.write("     hooray\n"))
107@posttask(print_hooray_again, print_whoppee_again, touch_file(os.path.join(tempdir, "done")))
108@merge(step_5_calculate_sum_of_squares, os.path.join(tempdir, "variance.result"))
109def step_6_calculate_variance(input_file_names, output_file_name):
110    """
111    Calculate variance naively
112    """
113    output = open(output_file_name,  "w")
114    #
115    #   initialise variables
116    #
117    all_sum_squared = 0.0
118    all_sum = 0.0
119    all_cnt_values = 0.0
120    #
121    # added up all the sum_squared, and sum and cnt_values from all the chunks
122    #
123    for input_file_name in input_file_names:
124        with open(input_file_name) as ii:
125            sum_squared, sum, cnt_values = list(map(float, ii.readlines()))
126        all_sum_squared += sum_squared
127        all_sum += sum
128        all_cnt_values += cnt_values
129    all_mean = all_sum / all_cnt_values
130    variance = (all_sum_squared - all_sum * all_mean)/(all_cnt_values)
131    #
132    #   print output
133    #
134    print(variance, file=output)
135    output.close()
136
137
138try:
139    from StringIO import StringIO
140except:
141    from io import StringIO
142
143
144class Test_ruffus(unittest.TestCase):
145    def setUp(self):
146        try:
147            shutil.rmtree(tempdir)
148        except:
149            pass
150
151    def tearDown(self):
152        try:
153            shutil.rmtree(tempdir)
154            pass
155        except:
156            pass
157
158    def atest_ruffus(self):
159        pipeline_run(multiprocess=50, verbose=0, pipeline="main")
160        output_file = os.path.join(tempdir, "variance.result")
161        if not os.path.exists(output_file):
162            raise Exception("Missing %s" % output_file)
163
164    def test_newstyle_ruffus(self):
165        test_pipeline = Pipeline("test")
166
167        test_pipeline.files(create_random_numbers, None, tempdir + "random_numbers.list")\
168            .follows(mkdir(tempdir))
169
170        test_pipeline.split(task_func=step_4_split_numbers_into_chunks,
171                            input=tempdir + "random_numbers.list",
172                            output=tempdir + "*.chunks")\
173            .follows(create_random_numbers)
174
175        test_pipeline.transform(task_func=step_5_calculate_sum_of_squares,
176                                input=step_4_split_numbers_into_chunks,
177                                filter=suffix(".chunks"),
178                                output=".sums")
179
180        test_pipeline.merge(task_func=step_6_calculate_variance, input=step_5_calculate_sum_of_squares, output=os.path.join(tempdir, "variance.result"))\
181            .posttask(lambda: sys.stdout.write("     hooray\n"))\
182            .posttask(print_hooray_again, print_whoppee_again, touch_file(os.path.join(tempdir, "done")))
183
184        test_pipeline.run(multiprocess=50, verbose=0)
185        output_file = os.path.join(tempdir, "variance.result")
186        if not os.path.exists(output_file):
187            raise Exception("Missing %s" % output_file)
188
189
190if __name__ == '__main__':
191    unittest.main()
192