1#!/usr/bin/env python
2
3import os
4import sys
5import time
6import bisect
7import signal
8import optparse
9import traceback
10
11try:
12  import whisper
13except ImportError:
14  raise SystemExit('[ERROR] Please make sure whisper is installed properly')
15
16# Ignore SIGPIPE
17signal.signal(signal.SIGPIPE, signal.SIG_DFL)
18
19now = int(time.time())
20
21option_parser = optparse.OptionParser(
22    usage='''%prog path timePerPoint:timeToStore [timePerPoint:timeToStore]*
23
24timePerPoint and timeToStore specify lengths of time, for example:
25
2660:1440      60 seconds per datapoint, 1440 datapoints = 1 day of retention
2715m:8        15 minutes per datapoint, 8 datapoints = 2 hours of retention
281h:7d        1 hour per datapoint, 7 days of retention
2912h:2y       12 hours per datapoint, 2 years of retention
30''')
31
32option_parser.add_option(
33    '--xFilesFactor', default=None,
34    type='float', help="Change the xFilesFactor")
35option_parser.add_option(
36    '--aggregationMethod', default=None,
37    type='string', help="Change the aggregation function (%s)" %
38    ', '.join(whisper.aggregationMethods))
39option_parser.add_option(
40    '--force', default=False, action='store_true',
41    help="Perform a destructive change")
42option_parser.add_option(
43    '--newfile', default=None, action='store',
44    help="Create a new database file without removing the existing one")
45option_parser.add_option(
46    '--nobackup', action='store_true',
47    help='Delete the .bak file after successful execution')
48option_parser.add_option(
49    '--aggregate', action='store_true',
50    help='Try to aggregate the values to fit the new archive better.'
51         ' Note that this will make things slower and use more memory.')
52
53(options, args) = option_parser.parse_args()
54
55if len(args) < 2:
56  option_parser.print_help()
57  sys.exit(1)
58
59path = args[0]
60
61if not os.path.exists(path):
62  sys.stderr.write("[ERROR] File '%s' does not exist!\n\n" % path)
63  option_parser.print_help()
64  sys.exit(1)
65
66info = whisper.info(path)
67
68new_archives = [whisper.parseRetentionDef(retentionDef)
69                for retentionDef in args[1:]]
70
71old_archives = info['archives']
72# sort by precision, lowest to highest
73old_archives.sort(key=lambda a: a['secondsPerPoint'], reverse=True)
74
75if options.xFilesFactor is None:
76  xff = info['xFilesFactor']
77else:
78  xff = options.xFilesFactor
79
80if options.aggregationMethod is None:
81  aggregationMethod = info['aggregationMethod']
82else:
83  aggregationMethod = options.aggregationMethod
84
85print('Retrieving all data from the archives')
86for archive in old_archives:
87  fromTime = now - archive['retention'] + archive['secondsPerPoint']
88  untilTime = now
89  timeinfo, values = whisper.fetch(path, fromTime, untilTime)
90  archive['data'] = (timeinfo, values)
91
92if options.newfile is None:
93  tmpfile = path + '.tmp'
94  if os.path.exists(tmpfile):
95    print('Removing previous temporary database file: %s' % tmpfile)
96    os.unlink(tmpfile)
97  newfile = tmpfile
98else:
99  newfile = options.newfile
100
101print('Creating new whisper database: %s' % newfile)
102whisper.create(newfile, new_archives, xFilesFactor=xff, aggregationMethod=aggregationMethod)
103size = os.stat(newfile).st_size
104print('Created: %s (%d bytes)' % (newfile, size))
105
106if options.aggregate:
107  # This is where data will be interpolated (best effort)
108  print('Migrating data with aggregation...')
109  all_datapoints = []
110  for archive in old_archives:
111    # Loading all datapoints into memory for fast querying
112    timeinfo, values = archive['data']
113    new_datapoints = zip(range(*timeinfo), values)
114    if all_datapoints:
115      last_timestamp = all_datapoints[-1][0]
116      slice_end = 0
117      for i, (timestamp, value) in enumerate(new_datapoints):
118        if timestamp > last_timestamp:
119          slice_end = i
120          break
121      all_datapoints += new_datapoints[i:]
122    else:
123      all_datapoints += new_datapoints
124
125  oldtimestamps = map(lambda p: p[0], all_datapoints)
126  oldvalues = map(lambda p: p[1], all_datapoints)
127
128  print("oldtimestamps: %s" % oldtimestamps)
129  # Simply cleaning up some used memory
130  del all_datapoints
131
132  new_info = whisper.info(newfile)
133  new_archives = new_info['archives']
134
135  for archive in new_archives:
136    step = archive['secondsPerPoint']
137    fromTime = now - archive['retention'] + now % step
138    untilTime = now + now % step + step
139    print("(%s,%s,%s)" % (fromTime, untilTime, step))
140    timepoints_to_update = range(fromTime, untilTime, step)
141    print("timepoints_to_update: %s" % timepoints_to_update)
142    newdatapoints = []
143    for tinterval in zip(timepoints_to_update[:-1], timepoints_to_update[1:]):
144      # TODO: Setting lo= parameter for 'lefti' based on righti from previous
145      #       iteration. Obviously, this can only be done if
146      #       timepoints_to_update is always updated. Is it?
147      lefti = bisect.bisect_left(oldtimestamps, tinterval[0])
148      righti = bisect.bisect_left(oldtimestamps, tinterval[1], lo=lefti)
149      newvalues = oldvalues[lefti:righti]
150      if newvalues:
151        non_none = filter(lambda x: x is not None, newvalues)
152        if 1.0 * len(non_none) / len(newvalues) >= xff:
153          newdatapoints.append([tinterval[0],
154                                whisper.aggregate(aggregationMethod,
155                                                  non_none, newvalues)])
156    whisper.update_many(newfile, newdatapoints)
157else:
158  print('Migrating data without aggregation...')
159  for archive in old_archives:
160    timeinfo, values = archive['data']
161    datapoints = zip(range(*timeinfo), values)
162    datapoints = filter(lambda p: p[1] is not None, datapoints)
163    whisper.update_many(newfile, datapoints)
164
165if options.newfile is not None:
166  sys.exit(0)
167
168backup = path + '.bak'
169print('Renaming old database to: %s' % backup)
170os.rename(path, backup)
171
172try:
173  print('Renaming new database to: %s' % path)
174  os.rename(tmpfile, path)
175except:
176  traceback.print_exc()
177  print('\nOperation failed, restoring backup')
178  os.rename(backup, path)
179  sys.exit(1)
180
181if options.nobackup:
182  print("Unlinking backup: %s" % backup)
183  os.unlink(backup)
184