1#!/usr/bin/env python 2# -*- coding: utf-8; py-indent-offset:4 -*- 3############################################################################### 4# 5# Copyright (C) 2015, 2016, 2017 Daniel Rodriguez 6# 7# This program is free software: you can redistribute it and/or modify 8# it under the terms of the GNU General Public License as published by 9# the Free Software Foundation, either version 3 of the License, or 10# (at your option) any later version. 11# 12# This program is distributed in the hope that it will be useful, 13# but WITHOUT ANY WARRANTY; without even the implied warranty of 14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15# GNU General Public License for more details. 16# 17# You should have received a copy of the GNU General Public License 18# along with this program. If not, see <http://www.gnu.org/licenses/>. 19# 20############################################################################### 21from __future__ import (absolute_import, division, print_function, 22 unicode_literals) 23 24import backtrader as bt 25import backtrader.feed as feed 26from ..utils import date2num 27import datetime as dt 28 29TIMEFRAMES = dict( 30 ( 31 (bt.TimeFrame.Seconds, 's'), 32 (bt.TimeFrame.Minutes, 'm'), 33 (bt.TimeFrame.Days, 'd'), 34 (bt.TimeFrame.Weeks, 'w'), 35 (bt.TimeFrame.Months, 'm'), 36 (bt.TimeFrame.Years, 'y'), 37 ) 38) 39 40 41class InfluxDB(feed.DataBase): 42 frompackages = ( 43 ('influxdb', [('InfluxDBClient', 'idbclient')]), 44 ('influxdb.exceptions', 'InfluxDBClientError') 45 ) 46 47 params = ( 48 ('host', '127.0.0.1'), 49 ('port', '8086'), 50 ('username', None), 51 ('password', None), 52 ('database', None), 53 ('timeframe', bt.TimeFrame.Days), 54 ('startdate', None), 55 ('high', 'high_p'), 56 ('low', 'low_p'), 57 ('open', 'open_p'), 58 ('close', 'close_p'), 59 ('volume', 'volume'), 60 ('ointerest', 'oi'), 61 ) 62 63 def start(self): 64 super(InfluxDB, self).start() 65 try: 66 self.ndb = idbclient(self.p.host, self.p.port, self.p.username, 67 self.p.password, self.p.database) 68 except InfluxDBClientError as err: 69 print('Failed to establish connection to InfluxDB: %s' % err) 70 71 tf = '{multiple}{timeframe}'.format( 72 multiple=(self.p.compression if self.p.compression else 1), 73 timeframe=TIMEFRAMES.get(self.p.timeframe, 'd')) 74 75 if not self.p.startdate: 76 st = '<= now()' 77 else: 78 st = '>= \'%s\'' % self.p.startdate 79 80 # The query could already consider parameters like fromdate and todate 81 # to have the database skip them and not the internal code 82 qstr = ('SELECT mean("{open_f}") AS "open", mean("{high_f}") AS "high", ' 83 'mean("{low_f}") AS "low", mean("{close_f}") AS "close", ' 84 'mean("{vol_f}") AS "volume", mean("{oi_f}") AS "openinterest" ' 85 'FROM "{dataname}" ' 86 'WHERE time {begin} ' 87 'GROUP BY time({timeframe}) fill(none)').format( 88 open_f=self.p.open, high_f=self.p.high, 89 low_f=self.p.low, close_f=self.p.close, 90 vol_f=self.p.volume, oi_f=self.p.ointerest, 91 timeframe=tf, begin=st, dataname=self.p.dataname) 92 93 try: 94 dbars = list(self.ndb.query(qstr).get_points()) 95 except InfluxDBClientError as err: 96 print('InfluxDB query failed: %s' % err) 97 98 self.biter = iter(dbars) 99 100 def _load(self): 101 try: 102 bar = next(self.biter) 103 except StopIteration: 104 return False 105 106 self.l.datetime[0] = date2num(dt.datetime.strptime(bar['time'], 107 '%Y-%m-%dT%H:%M:%SZ')) 108 109 self.l.open[0] = bar['open'] 110 self.l.high[0] = bar['high'] 111 self.l.low[0] = bar['low'] 112 self.l.close[0] = bar['close'] 113 self.l.volume[0] = bar['volume'] 114 115 return True 116