1#!/usr/local/bin/python3.8 2from __future__ import print_function 3import shutil 4import unittest 5import glob 6import random 7from ruffus import follows, split, mkdir, files, transform, suffix, posttask, touch_file, merge, Pipeline 8import sys 9import os 10 11 12 13NUMBER_OF_RANDOMS = 10000 14CHUNK_SIZE = 1000 15 16 17tempdir = os.path.relpath(os.path.abspath(os.path.splitext(__file__)[0])) + "/" 18 19# add grandparent to search path for testing 20grandparent_dir = os.path.abspath( 21 os.path.join(os.path.dirname(__file__), "..", "..")) 22sys.path.insert(0, grandparent_dir) 23 24# module name = script name without extension 25module_name = os.path.splitext(os.path.basename(__file__))[0] 26 27 28# --------------------------------------------------------------- 29# 30# Create random numbers 31# 32@follows(mkdir(tempdir)) 33@files(None, tempdir + "random_numbers.list") 34def create_random_numbers(input_file_name, output_file_name): 35 f = open(output_file_name, "w") 36 for i in range(NUMBER_OF_RANDOMS): 37 f.write("%g\n" % (random.random() * 100.0)) 38 f.close() 39 40# --------------------------------------------------------------- 41# 42# Split initial file 43# 44 45 46@follows(create_random_numbers) 47@split(tempdir + "random_numbers.list", tempdir + "*.chunks") 48def step_4_split_numbers_into_chunks(input_file_name, output_files): 49 """ 50 Splits random numbers file into XXX files of CHUNK_SIZE each 51 """ 52 # 53 # clean up files from previous runs 54 # 55 for f in glob.glob("*.chunks"): 56 os.unlink(f) 57 # 58 # create new file every CHUNK_SIZE lines and 59 # copy each line into current file 60 # 61 output_file = None 62 cnt_files = 0 63 with open(input_file_name) as ii: 64 for i, line in enumerate(ii): 65 if i % CHUNK_SIZE == 0: 66 cnt_files += 1 67 if output_file: 68 output_file.close() 69 output_file = open(tempdir + "%d.chunks" % cnt_files, "w") 70 output_file.write(line) 71 if output_file: 72 output_file.close() 73 74# --------------------------------------------------------------- 75# 76# Calculate sum and sum of squares for each chunk file 77# 78 79 80@transform(step_4_split_numbers_into_chunks, suffix(".chunks"), ".sums") 81def step_5_calculate_sum_of_squares(input_file_name, output_file_name): 82 with open(output_file_name, "w") as oo: 83 sum_squared, sum = [0.0, 0.0] 84 cnt_values = 0 85 with open(input_file_name) as ii: 86 for line in ii: 87 cnt_values += 1 88 val = float(line.rstrip()) 89 sum_squared += val * val 90 sum += val 91 oo.write("%s\n%s\n%d\n" % (repr(sum_squared), repr(sum), cnt_values)) 92 93 94def print_hooray_again(): 95 print(" hooray again") 96 97 98def print_whoppee_again(): 99 print(" whoppee again") 100 101 102# --------------------------------------------------------------- 103# 104# Calculate sum and sum of squares for each chunk 105# 106@posttask(lambda: sys.stdout.write(" hooray\n")) 107@posttask(print_hooray_again, print_whoppee_again, touch_file(os.path.join(tempdir, "done"))) 108@merge(step_5_calculate_sum_of_squares, os.path.join(tempdir, "variance.result")) 109def step_6_calculate_variance(input_file_names, output_file_name): 110 """ 111 Calculate variance naively 112 """ 113 output = open(output_file_name, "w") 114 # 115 # initialise variables 116 # 117 all_sum_squared = 0.0 118 all_sum = 0.0 119 all_cnt_values = 0.0 120 # 121 # added up all the sum_squared, and sum and cnt_values from all the chunks 122 # 123 for input_file_name in input_file_names: 124 with open(input_file_name) as ii: 125 sum_squared, sum, cnt_values = list(map(float, ii.readlines())) 126 all_sum_squared += sum_squared 127 all_sum += sum 128 all_cnt_values += cnt_values 129 all_mean = all_sum / all_cnt_values 130 variance = (all_sum_squared - all_sum * all_mean)/(all_cnt_values) 131 # 132 # print output 133 # 134 print(variance, file=output) 135 output.close() 136 137 138try: 139 from StringIO import StringIO 140except: 141 from io import StringIO 142 143 144class Test_ruffus(unittest.TestCase): 145 def setUp(self): 146 try: 147 shutil.rmtree(tempdir) 148 except: 149 pass 150 151 def tearDown(self): 152 try: 153 shutil.rmtree(tempdir) 154 pass 155 except: 156 pass 157 158 def atest_ruffus(self): 159 pipeline_run(multiprocess=50, verbose=0, pipeline="main") 160 output_file = os.path.join(tempdir, "variance.result") 161 if not os.path.exists(output_file): 162 raise Exception("Missing %s" % output_file) 163 164 def test_newstyle_ruffus(self): 165 test_pipeline = Pipeline("test") 166 167 test_pipeline.files(create_random_numbers, None, tempdir + "random_numbers.list")\ 168 .follows(mkdir(tempdir)) 169 170 test_pipeline.split(task_func=step_4_split_numbers_into_chunks, 171 input=tempdir + "random_numbers.list", 172 output=tempdir + "*.chunks")\ 173 .follows(create_random_numbers) 174 175 test_pipeline.transform(task_func=step_5_calculate_sum_of_squares, 176 input=step_4_split_numbers_into_chunks, 177 filter=suffix(".chunks"), 178 output=".sums") 179 180 test_pipeline.merge(task_func=step_6_calculate_variance, input=step_5_calculate_sum_of_squares, output=os.path.join(tempdir, "variance.result"))\ 181 .posttask(lambda: sys.stdout.write(" hooray\n"))\ 182 .posttask(print_hooray_again, print_whoppee_again, touch_file(os.path.join(tempdir, "done"))) 183 184 test_pipeline.run(multiprocess=50, verbose=0) 185 output_file = os.path.join(tempdir, "variance.result") 186 if not os.path.exists(output_file): 187 raise Exception("Missing %s" % output_file) 188 189 190if __name__ == '__main__': 191 unittest.main() 192