1#!/usr/bin/env python 2 3import os 4import sys 5import time 6import bisect 7import signal 8import optparse 9import traceback 10 11try: 12 import whisper 13except ImportError: 14 raise SystemExit('[ERROR] Please make sure whisper is installed properly') 15 16# Ignore SIGPIPE 17signal.signal(signal.SIGPIPE, signal.SIG_DFL) 18 19now = int(time.time()) 20 21option_parser = optparse.OptionParser( 22 usage='''%prog path timePerPoint:timeToStore [timePerPoint:timeToStore]* 23 24timePerPoint and timeToStore specify lengths of time, for example: 25 2660:1440 60 seconds per datapoint, 1440 datapoints = 1 day of retention 2715m:8 15 minutes per datapoint, 8 datapoints = 2 hours of retention 281h:7d 1 hour per datapoint, 7 days of retention 2912h:2y 12 hours per datapoint, 2 years of retention 30''') 31 32option_parser.add_option( 33 '--xFilesFactor', default=None, 34 type='float', help="Change the xFilesFactor") 35option_parser.add_option( 36 '--aggregationMethod', default=None, 37 type='string', help="Change the aggregation function (%s)" % 38 ', '.join(whisper.aggregationMethods)) 39option_parser.add_option( 40 '--force', default=False, action='store_true', 41 help="Perform a destructive change") 42option_parser.add_option( 43 '--newfile', default=None, action='store', 44 help="Create a new database file without removing the existing one") 45option_parser.add_option( 46 '--nobackup', action='store_true', 47 help='Delete the .bak file after successful execution') 48option_parser.add_option( 49 '--aggregate', action='store_true', 50 help='Try to aggregate the values to fit the new archive better.' 51 ' Note that this will make things slower and use more memory.') 52 53(options, args) = option_parser.parse_args() 54 55if len(args) < 2: 56 option_parser.print_help() 57 sys.exit(1) 58 59path = args[0] 60 61if not os.path.exists(path): 62 sys.stderr.write("[ERROR] File '%s' does not exist!\n\n" % path) 63 option_parser.print_help() 64 sys.exit(1) 65 66info = whisper.info(path) 67 68new_archives = [whisper.parseRetentionDef(retentionDef) 69 for retentionDef in args[1:]] 70 71old_archives = info['archives'] 72# sort by precision, lowest to highest 73old_archives.sort(key=lambda a: a['secondsPerPoint'], reverse=True) 74 75if options.xFilesFactor is None: 76 xff = info['xFilesFactor'] 77else: 78 xff = options.xFilesFactor 79 80if options.aggregationMethod is None: 81 aggregationMethod = info['aggregationMethod'] 82else: 83 aggregationMethod = options.aggregationMethod 84 85print('Retrieving all data from the archives') 86for archive in old_archives: 87 fromTime = now - archive['retention'] + archive['secondsPerPoint'] 88 untilTime = now 89 timeinfo, values = whisper.fetch(path, fromTime, untilTime) 90 archive['data'] = (timeinfo, values) 91 92if options.newfile is None: 93 tmpfile = path + '.tmp' 94 if os.path.exists(tmpfile): 95 print('Removing previous temporary database file: %s' % tmpfile) 96 os.unlink(tmpfile) 97 newfile = tmpfile 98else: 99 newfile = options.newfile 100 101print('Creating new whisper database: %s' % newfile) 102whisper.create(newfile, new_archives, xFilesFactor=xff, aggregationMethod=aggregationMethod) 103size = os.stat(newfile).st_size 104print('Created: %s (%d bytes)' % (newfile, size)) 105 106if options.aggregate: 107 # This is where data will be interpolated (best effort) 108 print('Migrating data with aggregation...') 109 all_datapoints = [] 110 for archive in old_archives: 111 # Loading all datapoints into memory for fast querying 112 timeinfo, values = archive['data'] 113 new_datapoints = zip(range(*timeinfo), values) 114 if all_datapoints: 115 last_timestamp = all_datapoints[-1][0] 116 slice_end = 0 117 for i, (timestamp, value) in enumerate(new_datapoints): 118 if timestamp > last_timestamp: 119 slice_end = i 120 break 121 all_datapoints += new_datapoints[i:] 122 else: 123 all_datapoints += new_datapoints 124 125 oldtimestamps = map(lambda p: p[0], all_datapoints) 126 oldvalues = map(lambda p: p[1], all_datapoints) 127 128 print("oldtimestamps: %s" % oldtimestamps) 129 # Simply cleaning up some used memory 130 del all_datapoints 131 132 new_info = whisper.info(newfile) 133 new_archives = new_info['archives'] 134 135 for archive in new_archives: 136 step = archive['secondsPerPoint'] 137 fromTime = now - archive['retention'] + now % step 138 untilTime = now + now % step + step 139 print("(%s,%s,%s)" % (fromTime, untilTime, step)) 140 timepoints_to_update = range(fromTime, untilTime, step) 141 print("timepoints_to_update: %s" % timepoints_to_update) 142 newdatapoints = [] 143 for tinterval in zip(timepoints_to_update[:-1], timepoints_to_update[1:]): 144 # TODO: Setting lo= parameter for 'lefti' based on righti from previous 145 # iteration. Obviously, this can only be done if 146 # timepoints_to_update is always updated. Is it? 147 lefti = bisect.bisect_left(oldtimestamps, tinterval[0]) 148 righti = bisect.bisect_left(oldtimestamps, tinterval[1], lo=lefti) 149 newvalues = oldvalues[lefti:righti] 150 if newvalues: 151 non_none = filter(lambda x: x is not None, newvalues) 152 if 1.0 * len(non_none) / len(newvalues) >= xff: 153 newdatapoints.append([tinterval[0], 154 whisper.aggregate(aggregationMethod, 155 non_none, newvalues)]) 156 whisper.update_many(newfile, newdatapoints) 157else: 158 print('Migrating data without aggregation...') 159 for archive in old_archives: 160 timeinfo, values = archive['data'] 161 datapoints = zip(range(*timeinfo), values) 162 datapoints = filter(lambda p: p[1] is not None, datapoints) 163 whisper.update_many(newfile, datapoints) 164 165if options.newfile is not None: 166 sys.exit(0) 167 168backup = path + '.bak' 169print('Renaming old database to: %s' % backup) 170os.rename(path, backup) 171 172try: 173 print('Renaming new database to: %s' % path) 174 os.rename(tmpfile, path) 175except: 176 traceback.print_exc() 177 print('\nOperation failed, restoring backup') 178 os.rename(backup, path) 179 sys.exit(1) 180 181if options.nobackup: 182 print("Unlinking backup: %s" % backup) 183 os.unlink(backup) 184