1#!/usr/bin/env python 2# -*- coding: utf-8; py-indent-offset:4 -*- 3############################################################################### 4# 5# Copyright (C) 2015, 2016, 2017 Daniel Rodriguez 6# 7# This program is free software: you can redistribute it and/or modify 8# it under the terms of the GNU General Public License as published by 9# the Free Software Foundation, either version 3 of the License, or 10# (at your option) any later version. 11# 12# This program is distributed in the hope that it will be useful, 13# but WITHOUT ANY WARRANTY; without even the implied warranty of 14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15# GNU General Public License for more details. 16# 17# You should have received a copy of the GNU General Public License 18# along with this program. If not, see <http://www.gnu.org/licenses/>. 19# 20############################################################################### 21from __future__ import (absolute_import, division, print_function, 22 unicode_literals) 23 24from datetime import datetime, timedelta 25 26from backtrader.feed import DataBase 27from backtrader import TimeFrame, date2num, num2date 28from backtrader.utils.py3 import (integer_types, queue, string_types, 29 with_metaclass) 30from backtrader.metabase import MetaParams 31from backtrader.stores import oandastore 32 33 34class MetaOandaData(DataBase.__class__): 35 def __init__(cls, name, bases, dct): 36 '''Class has already been created ... register''' 37 # Initialize the class 38 super(MetaOandaData, cls).__init__(name, bases, dct) 39 40 # Register with the store 41 oandastore.OandaStore.DataCls = cls 42 43 44class OandaData(with_metaclass(MetaOandaData, DataBase)): 45 '''Oanda Data Feed. 46 47 Params: 48 49 - ``qcheck`` (default: ``0.5``) 50 51 Time in seconds to wake up if no data is received to give a chance to 52 resample/replay packets properly and pass notifications up the chain 53 54 - ``historical`` (default: ``False``) 55 56 If set to ``True`` the data feed will stop after doing the first 57 download of data. 58 59 The standard data feed parameters ``fromdate`` and ``todate`` will be 60 used as reference. 61 62 The data feed will make multiple requests if the requested duration is 63 larger than the one allowed by IB given the timeframe/compression 64 chosen for the data. 65 66 - ``backfill_start`` (default: ``True``) 67 68 Perform backfilling at the start. The maximum possible historical data 69 will be fetched in a single request. 70 71 - ``backfill`` (default: ``True``) 72 73 Perform backfilling after a disconnection/reconnection cycle. The gap 74 duration will be used to download the smallest possible amount of data 75 76 - ``backfill_from`` (default: ``None``) 77 78 An additional data source can be passed to do an initial layer of 79 backfilling. Once the data source is depleted and if requested, 80 backfilling from IB will take place. This is ideally meant to backfill 81 from already stored sources like a file on disk, but not limited to. 82 83 - ``bidask`` (default: ``True``) 84 85 If ``True``, then the historical/backfilling requests will request 86 bid/ask prices from the server 87 88 If ``False``, then *midpoint* will be requested 89 90 - ``useask`` (default: ``False``) 91 92 If ``True`` the *ask* part of the *bidask* prices will be used instead 93 of the default use of *bid* 94 95 - ``includeFirst`` (default: ``True``) 96 97 Influence the delivery of the 1st bar of a historical/backfilling 98 request by setting the parameter directly to the Oanda API calls 99 100 - ``reconnect`` (default: ``True``) 101 102 Reconnect when network connection is down 103 104 - ``reconnections`` (default: ``-1``) 105 106 Number of times to attempt reconnections: ``-1`` means forever 107 108 - ``reconntimeout`` (default: ``5.0``) 109 110 Time in seconds to wait in between reconnection attemps 111 112 This data feed supports only this mapping of ``timeframe`` and 113 ``compression``, which comply with the definitions in the OANDA API 114 Developer's Guid:: 115 116 (TimeFrame.Seconds, 5): 'S5', 117 (TimeFrame.Seconds, 10): 'S10', 118 (TimeFrame.Seconds, 15): 'S15', 119 (TimeFrame.Seconds, 30): 'S30', 120 (TimeFrame.Minutes, 1): 'M1', 121 (TimeFrame.Minutes, 2): 'M3', 122 (TimeFrame.Minutes, 3): 'M3', 123 (TimeFrame.Minutes, 4): 'M4', 124 (TimeFrame.Minutes, 5): 'M5', 125 (TimeFrame.Minutes, 10): 'M10', 126 (TimeFrame.Minutes, 15): 'M15', 127 (TimeFrame.Minutes, 30): 'M30', 128 (TimeFrame.Minutes, 60): 'H1', 129 (TimeFrame.Minutes, 120): 'H2', 130 (TimeFrame.Minutes, 180): 'H3', 131 (TimeFrame.Minutes, 240): 'H4', 132 (TimeFrame.Minutes, 360): 'H6', 133 (TimeFrame.Minutes, 480): 'H8', 134 (TimeFrame.Days, 1): 'D', 135 (TimeFrame.Weeks, 1): 'W', 136 (TimeFrame.Months, 1): 'M', 137 138 Any other combination will be rejected 139 ''' 140 params = ( 141 ('qcheck', 0.5), 142 ('historical', False), # do backfilling at the start 143 ('backfill_start', True), # do backfilling at the start 144 ('backfill', True), # do backfilling when reconnecting 145 ('backfill_from', None), # additional data source to do backfill from 146 ('bidask', True), 147 ('useask', False), 148 ('includeFirst', True), 149 ('reconnect', True), 150 ('reconnections', -1), # forever 151 ('reconntimeout', 5.0), 152 ) 153 154 _store = oandastore.OandaStore 155 156 # States for the Finite State Machine in _load 157 _ST_FROM, _ST_START, _ST_LIVE, _ST_HISTORBACK, _ST_OVER = range(5) 158 159 _TOFFSET = timedelta() 160 161 def _timeoffset(self): 162 # Effective way to overcome the non-notification? 163 return self._TOFFSET 164 165 def islive(self): 166 '''Returns ``True`` to notify ``Cerebro`` that preloading and runonce 167 should be deactivated''' 168 return True 169 170 def __init__(self, **kwargs): 171 self.o = self._store(**kwargs) 172 self._candleFormat = 'bidask' if self.p.bidask else 'midpoint' 173 174 def setenvironment(self, env): 175 '''Receives an environment (cerebro) and passes it over to the store it 176 belongs to''' 177 super(OandaData, self).setenvironment(env) 178 env.addstore(self.o) 179 180 def start(self): 181 '''Starts the Oanda connecction and gets the real contract and 182 contractdetails if it exists''' 183 super(OandaData, self).start() 184 185 # Create attributes as soon as possible 186 self._statelivereconn = False # if reconnecting in live state 187 self._storedmsg = dict() # keep pending live message (under None) 188 self.qlive = queue.Queue() 189 self._state = self._ST_OVER 190 191 # Kickstart store and get queue to wait on 192 self.o.start(data=self) 193 194 # check if the granularity is supported 195 otf = self.o.get_granularity(self._timeframe, self._compression) 196 if otf is None: 197 self.put_notification(self.NOTSUPPORTED_TF) 198 self._state = self._ST_OVER 199 return 200 201 self.contractdetails = cd = self.o.get_instrument(self.p.dataname) 202 if cd is None: 203 self.put_notification(self.NOTSUBSCRIBED) 204 self._state = self._ST_OVER 205 return 206 207 if self.p.backfill_from is not None: 208 self._state = self._ST_FROM 209 self.p.backfill_from._start() 210 else: 211 self._start_finish() 212 self._state = self._ST_START # initial state for _load 213 self._st_start() 214 215 self._reconns = 0 216 217 def _st_start(self, instart=True, tmout=None): 218 if self.p.historical: 219 self.put_notification(self.DELAYED) 220 dtend = None 221 if self.todate < float('inf'): 222 dtend = num2date(self.todate) 223 224 dtbegin = None 225 if self.fromdate > float('-inf'): 226 dtbegin = num2date(self.fromdate) 227 228 self.qhist = self.o.candles( 229 self.p.dataname, dtbegin, dtend, 230 self._timeframe, self._compression, 231 candleFormat=self._candleFormat, 232 includeFirst=self.p.includeFirst) 233 234 self._state = self._ST_HISTORBACK 235 return True 236 237 self.qlive = self.o.streaming_prices(self.p.dataname, tmout=tmout) 238 if instart: 239 self._statelivereconn = self.p.backfill_start 240 else: 241 self._statelivereconn = self.p.backfill 242 243 if self._statelivereconn: 244 self.put_notification(self.DELAYED) 245 246 self._state = self._ST_LIVE 247 if instart: 248 self._reconns = self.p.reconnections 249 250 return True # no return before - implicit continue 251 252 def stop(self): 253 '''Stops and tells the store to stop''' 254 super(OandaData, self).stop() 255 self.o.stop() 256 257 def haslivedata(self): 258 return bool(self._storedmsg or self.qlive) # do not return the objs 259 260 def _load(self): 261 if self._state == self._ST_OVER: 262 return False 263 264 while True: 265 if self._state == self._ST_LIVE: 266 try: 267 msg = (self._storedmsg.pop(None, None) or 268 self.qlive.get(timeout=self._qcheck)) 269 except queue.Empty: 270 return None # indicate timeout situation 271 272 if msg is None: # Conn broken during historical/backfilling 273 self.put_notification(self.CONNBROKEN) 274 # Try to reconnect 275 if not self.p.reconnect or self._reconns == 0: 276 # Can no longer reconnect 277 self.put_notification(self.DISCONNECTED) 278 self._state = self._ST_OVER 279 return False # failed 280 281 self._reconns -= 1 282 self._st_start(instart=False, tmout=self.p.reconntimeout) 283 continue 284 285 if 'code' in msg: 286 self.put_notification(self.CONNBROKEN) 287 code = msg['code'] 288 if code not in [599, 598, 596]: 289 self.put_notification(self.DISCONNECTED) 290 self._state = self._ST_OVER 291 return False # failed 292 293 if not self.p.reconnect or self._reconns == 0: 294 # Can no longer reconnect 295 self.put_notification(self.DISCONNECTED) 296 self._state = self._ST_OVER 297 return False # failed 298 299 # Can reconnect 300 self._reconns -= 1 301 self._st_start(instart=False, tmout=self.p.reconntimeout) 302 continue 303 304 self._reconns = self.p.reconnections 305 306 # Process the message according to expected return type 307 if not self._statelivereconn: 308 if self._laststatus != self.LIVE: 309 if self.qlive.qsize() <= 1: # very short live queue 310 self.put_notification(self.LIVE) 311 312 ret = self._load_tick(msg) 313 if ret: 314 return True 315 316 # could not load bar ... go and get new one 317 continue 318 319 # Fall through to processing reconnect - try to backfill 320 self._storedmsg[None] = msg # keep the msg 321 322 # else do a backfill 323 if self._laststatus != self.DELAYED: 324 self.put_notification(self.DELAYED) 325 326 dtend = None 327 if len(self) > 1: 328 # len == 1 ... forwarded for the 1st time 329 dtbegin = self.datetime.datetime(-1) 330 elif self.fromdate > float('-inf'): 331 dtbegin = num2date(self.fromdate) 332 else: # 1st bar and no begin set 333 # passing None to fetch max possible in 1 request 334 dtbegin = None 335 336 dtend = datetime.utcfromtimestamp(int(msg['time']) / 10 ** 6) 337 338 self.qhist = self.o.candles( 339 self.p.dataname, dtbegin, dtend, 340 self._timeframe, self._compression, 341 candleFormat=self._candleFormat, 342 includeFirst=self.p.includeFirst) 343 344 self._state = self._ST_HISTORBACK 345 self._statelivereconn = False # no longer in live 346 continue 347 348 elif self._state == self._ST_HISTORBACK: 349 msg = self.qhist.get() 350 if msg is None: # Conn broken during historical/backfilling 351 # Situation not managed. Simply bail out 352 self.put_notification(self.DISCONNECTED) 353 self._state = self._ST_OVER 354 return False # error management cancelled the queue 355 356 elif 'code' in msg: # Error 357 self.put_notification(self.NOTSUBSCRIBED) 358 self.put_notification(self.DISCONNECTED) 359 self._state = self._ST_OVER 360 return False 361 362 if msg: 363 if self._load_history(msg): 364 return True # loading worked 365 366 continue # not loaded ... date may have been seen 367 else: 368 # End of histdata 369 if self.p.historical: # only historical 370 self.put_notification(self.DISCONNECTED) 371 self._state = self._ST_OVER 372 return False # end of historical 373 374 # Live is also wished - go for it 375 self._state = self._ST_LIVE 376 continue 377 378 elif self._state == self._ST_FROM: 379 if not self.p.backfill_from.next(): 380 # additional data source is consumed 381 self._state = self._ST_START 382 continue 383 384 # copy lines of the same name 385 for alias in self.lines.getlinealiases(): 386 lsrc = getattr(self.p.backfill_from.lines, alias) 387 ldst = getattr(self.lines, alias) 388 389 ldst[0] = lsrc[0] 390 391 return True 392 393 elif self._state == self._ST_START: 394 if not self._st_start(instart=False): 395 self._state = self._ST_OVER 396 return False 397 398 def _load_tick(self, msg): 399 dtobj = datetime.utcfromtimestamp(int(msg['time']) / 10 ** 6) 400 dt = date2num(dtobj) 401 if dt <= self.lines.datetime[-1]: 402 return False # time already seen 403 404 # Common fields 405 self.lines.datetime[0] = dt 406 self.lines.volume[0] = 0.0 407 self.lines.openinterest[0] = 0.0 408 409 # Put the prices into the bar 410 tick = float(msg['ask']) if self.p.useask else float(msg['bid']) 411 self.lines.open[0] = tick 412 self.lines.high[0] = tick 413 self.lines.low[0] = tick 414 self.lines.close[0] = tick 415 self.lines.volume[0] = 0.0 416 self.lines.openinterest[0] = 0.0 417 418 return True 419 420 def _load_history(self, msg): 421 dtobj = datetime.utcfromtimestamp(int(msg['time']) / 10 ** 6) 422 dt = date2num(dtobj) 423 if dt <= self.lines.datetime[-1]: 424 return False # time already seen 425 426 # Common fields 427 self.lines.datetime[0] = dt 428 self.lines.volume[0] = float(msg['volume']) 429 self.lines.openinterest[0] = 0.0 430 431 # Put the prices into the bar 432 if self.p.bidask: 433 if not self.p.useask: 434 self.lines.open[0] = float(msg['openBid']) 435 self.lines.high[0] = float(msg['highBid']) 436 self.lines.low[0] = float(msg['lowBid']) 437 self.lines.close[0] = float(msg['closeBid']) 438 else: 439 self.lines.open[0] = float(msg['openAsk']) 440 self.lines.high[0] = float(msg['highAsk']) 441 self.lines.low[0] = float(msg['lowAsk']) 442 self.lines.close[0] = float(msg['closeAsk']) 443 else: 444 self.lines.open[0] = float(msg['openMid']) 445 self.lines.high[0] = float(msg['highMid']) 446 self.lines.low[0] = float(msg['lowMid']) 447 self.lines.close[0] = float(msg['closeMid']) 448 449 return True 450