1#!/usr/bin/env python
2# -*- coding: utf-8; py-indent-offset:4 -*-
3###############################################################################
4#
5# Copyright (C) 2015, 2016, 2017 Daniel Rodriguez
6#
7# This program is free software: you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation, either version 3 of the License, or
10# (at your option) any later version.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program.  If not, see <http://www.gnu.org/licenses/>.
19#
20###############################################################################
21from __future__ import (absolute_import, division, print_function,
22                        unicode_literals)
23
24from datetime import datetime, timedelta
25
26from backtrader.feed import DataBase
27from backtrader import TimeFrame, date2num, num2date
28from backtrader.utils.py3 import (integer_types, queue, string_types,
29                                  with_metaclass)
30from backtrader.metabase import MetaParams
31from backtrader.stores import oandastore
32
33
34class MetaOandaData(DataBase.__class__):
35    def __init__(cls, name, bases, dct):
36        '''Class has already been created ... register'''
37        # Initialize the class
38        super(MetaOandaData, cls).__init__(name, bases, dct)
39
40        # Register with the store
41        oandastore.OandaStore.DataCls = cls
42
43
44class OandaData(with_metaclass(MetaOandaData, DataBase)):
45    '''Oanda Data Feed.
46
47    Params:
48
49      - ``qcheck`` (default: ``0.5``)
50
51        Time in seconds to wake up if no data is received to give a chance to
52        resample/replay packets properly and pass notifications up the chain
53
54      - ``historical`` (default: ``False``)
55
56        If set to ``True`` the data feed will stop after doing the first
57        download of data.
58
59        The standard data feed parameters ``fromdate`` and ``todate`` will be
60        used as reference.
61
62        The data feed will make multiple requests if the requested duration is
63        larger than the one allowed by IB given the timeframe/compression
64        chosen for the data.
65
66      - ``backfill_start`` (default: ``True``)
67
68        Perform backfilling at the start. The maximum possible historical data
69        will be fetched in a single request.
70
71      - ``backfill`` (default: ``True``)
72
73        Perform backfilling after a disconnection/reconnection cycle. The gap
74        duration will be used to download the smallest possible amount of data
75
76      - ``backfill_from`` (default: ``None``)
77
78        An additional data source can be passed to do an initial layer of
79        backfilling. Once the data source is depleted and if requested,
80        backfilling from IB will take place. This is ideally meant to backfill
81        from already stored sources like a file on disk, but not limited to.
82
83      - ``bidask`` (default: ``True``)
84
85        If ``True``, then the historical/backfilling requests will request
86        bid/ask prices from the server
87
88        If ``False``, then *midpoint* will be requested
89
90      - ``useask`` (default: ``False``)
91
92        If ``True`` the *ask* part of the *bidask* prices will be used instead
93        of the default use of *bid*
94
95      - ``includeFirst`` (default: ``True``)
96
97        Influence the delivery of the 1st bar of a historical/backfilling
98        request by setting the parameter directly to the Oanda API calls
99
100      - ``reconnect`` (default: ``True``)
101
102        Reconnect when network connection is down
103
104      - ``reconnections`` (default: ``-1``)
105
106        Number of times to attempt reconnections: ``-1`` means forever
107
108      - ``reconntimeout`` (default: ``5.0``)
109
110        Time in seconds to wait in between reconnection attemps
111
112    This data feed supports only this mapping of ``timeframe`` and
113    ``compression``, which comply with the definitions in the OANDA API
114    Developer's Guid::
115
116        (TimeFrame.Seconds, 5): 'S5',
117        (TimeFrame.Seconds, 10): 'S10',
118        (TimeFrame.Seconds, 15): 'S15',
119        (TimeFrame.Seconds, 30): 'S30',
120        (TimeFrame.Minutes, 1): 'M1',
121        (TimeFrame.Minutes, 2): 'M3',
122        (TimeFrame.Minutes, 3): 'M3',
123        (TimeFrame.Minutes, 4): 'M4',
124        (TimeFrame.Minutes, 5): 'M5',
125        (TimeFrame.Minutes, 10): 'M10',
126        (TimeFrame.Minutes, 15): 'M15',
127        (TimeFrame.Minutes, 30): 'M30',
128        (TimeFrame.Minutes, 60): 'H1',
129        (TimeFrame.Minutes, 120): 'H2',
130        (TimeFrame.Minutes, 180): 'H3',
131        (TimeFrame.Minutes, 240): 'H4',
132        (TimeFrame.Minutes, 360): 'H6',
133        (TimeFrame.Minutes, 480): 'H8',
134        (TimeFrame.Days, 1): 'D',
135        (TimeFrame.Weeks, 1): 'W',
136        (TimeFrame.Months, 1): 'M',
137
138    Any other combination will be rejected
139    '''
140    params = (
141        ('qcheck', 0.5),
142        ('historical', False),  # do backfilling at the start
143        ('backfill_start', True),  # do backfilling at the start
144        ('backfill', True),  # do backfilling when reconnecting
145        ('backfill_from', None),  # additional data source to do backfill from
146        ('bidask', True),
147        ('useask', False),
148        ('includeFirst', True),
149        ('reconnect', True),
150        ('reconnections', -1),  # forever
151        ('reconntimeout', 5.0),
152    )
153
154    _store = oandastore.OandaStore
155
156    # States for the Finite State Machine in _load
157    _ST_FROM, _ST_START, _ST_LIVE, _ST_HISTORBACK, _ST_OVER = range(5)
158
159    _TOFFSET = timedelta()
160
161    def _timeoffset(self):
162        # Effective way to overcome the non-notification?
163        return self._TOFFSET
164
165    def islive(self):
166        '''Returns ``True`` to notify ``Cerebro`` that preloading and runonce
167        should be deactivated'''
168        return True
169
170    def __init__(self, **kwargs):
171        self.o = self._store(**kwargs)
172        self._candleFormat = 'bidask' if self.p.bidask else 'midpoint'
173
174    def setenvironment(self, env):
175        '''Receives an environment (cerebro) and passes it over to the store it
176        belongs to'''
177        super(OandaData, self).setenvironment(env)
178        env.addstore(self.o)
179
180    def start(self):
181        '''Starts the Oanda connecction and gets the real contract and
182        contractdetails if it exists'''
183        super(OandaData, self).start()
184
185        # Create attributes as soon as possible
186        self._statelivereconn = False  # if reconnecting in live state
187        self._storedmsg = dict()  # keep pending live message (under None)
188        self.qlive = queue.Queue()
189        self._state = self._ST_OVER
190
191        # Kickstart store and get queue to wait on
192        self.o.start(data=self)
193
194        # check if the granularity is supported
195        otf = self.o.get_granularity(self._timeframe, self._compression)
196        if otf is None:
197            self.put_notification(self.NOTSUPPORTED_TF)
198            self._state = self._ST_OVER
199            return
200
201        self.contractdetails = cd = self.o.get_instrument(self.p.dataname)
202        if cd is None:
203            self.put_notification(self.NOTSUBSCRIBED)
204            self._state = self._ST_OVER
205            return
206
207        if self.p.backfill_from is not None:
208            self._state = self._ST_FROM
209            self.p.backfill_from._start()
210        else:
211            self._start_finish()
212            self._state = self._ST_START  # initial state for _load
213            self._st_start()
214
215        self._reconns = 0
216
217    def _st_start(self, instart=True, tmout=None):
218        if self.p.historical:
219            self.put_notification(self.DELAYED)
220            dtend = None
221            if self.todate < float('inf'):
222                dtend = num2date(self.todate)
223
224            dtbegin = None
225            if self.fromdate > float('-inf'):
226                dtbegin = num2date(self.fromdate)
227
228            self.qhist = self.o.candles(
229                self.p.dataname, dtbegin, dtend,
230                self._timeframe, self._compression,
231                candleFormat=self._candleFormat,
232                includeFirst=self.p.includeFirst)
233
234            self._state = self._ST_HISTORBACK
235            return True
236
237        self.qlive = self.o.streaming_prices(self.p.dataname, tmout=tmout)
238        if instart:
239            self._statelivereconn = self.p.backfill_start
240        else:
241            self._statelivereconn = self.p.backfill
242
243        if self._statelivereconn:
244            self.put_notification(self.DELAYED)
245
246        self._state = self._ST_LIVE
247        if instart:
248            self._reconns = self.p.reconnections
249
250        return True  # no return before - implicit continue
251
252    def stop(self):
253        '''Stops and tells the store to stop'''
254        super(OandaData, self).stop()
255        self.o.stop()
256
257    def haslivedata(self):
258        return bool(self._storedmsg or self.qlive)  # do not return the objs
259
260    def _load(self):
261        if self._state == self._ST_OVER:
262            return False
263
264        while True:
265            if self._state == self._ST_LIVE:
266                try:
267                    msg = (self._storedmsg.pop(None, None) or
268                           self.qlive.get(timeout=self._qcheck))
269                except queue.Empty:
270                    return None  # indicate timeout situation
271
272                if msg is None:  # Conn broken during historical/backfilling
273                    self.put_notification(self.CONNBROKEN)
274                    # Try to reconnect
275                    if not self.p.reconnect or self._reconns == 0:
276                        # Can no longer reconnect
277                        self.put_notification(self.DISCONNECTED)
278                        self._state = self._ST_OVER
279                        return False  # failed
280
281                    self._reconns -= 1
282                    self._st_start(instart=False, tmout=self.p.reconntimeout)
283                    continue
284
285                if 'code' in msg:
286                    self.put_notification(self.CONNBROKEN)
287                    code = msg['code']
288                    if code not in [599, 598, 596]:
289                        self.put_notification(self.DISCONNECTED)
290                        self._state = self._ST_OVER
291                        return False  # failed
292
293                    if not self.p.reconnect or self._reconns == 0:
294                        # Can no longer reconnect
295                        self.put_notification(self.DISCONNECTED)
296                        self._state = self._ST_OVER
297                        return False  # failed
298
299                    # Can reconnect
300                    self._reconns -= 1
301                    self._st_start(instart=False, tmout=self.p.reconntimeout)
302                    continue
303
304                self._reconns = self.p.reconnections
305
306                # Process the message according to expected return type
307                if not self._statelivereconn:
308                    if self._laststatus != self.LIVE:
309                        if self.qlive.qsize() <= 1:  # very short live queue
310                            self.put_notification(self.LIVE)
311
312                    ret = self._load_tick(msg)
313                    if ret:
314                        return True
315
316                    # could not load bar ... go and get new one
317                    continue
318
319                # Fall through to processing reconnect - try to backfill
320                self._storedmsg[None] = msg  # keep the msg
321
322                # else do a backfill
323                if self._laststatus != self.DELAYED:
324                    self.put_notification(self.DELAYED)
325
326                dtend = None
327                if len(self) > 1:
328                    # len == 1 ... forwarded for the 1st time
329                    dtbegin = self.datetime.datetime(-1)
330                elif self.fromdate > float('-inf'):
331                    dtbegin = num2date(self.fromdate)
332                else:  # 1st bar and no begin set
333                    # passing None to fetch max possible in 1 request
334                    dtbegin = None
335
336                dtend = datetime.utcfromtimestamp(int(msg['time']) / 10 ** 6)
337
338                self.qhist = self.o.candles(
339                    self.p.dataname, dtbegin, dtend,
340                    self._timeframe, self._compression,
341                    candleFormat=self._candleFormat,
342                    includeFirst=self.p.includeFirst)
343
344                self._state = self._ST_HISTORBACK
345                self._statelivereconn = False  # no longer in live
346                continue
347
348            elif self._state == self._ST_HISTORBACK:
349                msg = self.qhist.get()
350                if msg is None:  # Conn broken during historical/backfilling
351                    # Situation not managed. Simply bail out
352                    self.put_notification(self.DISCONNECTED)
353                    self._state = self._ST_OVER
354                    return False  # error management cancelled the queue
355
356                elif 'code' in msg:  # Error
357                    self.put_notification(self.NOTSUBSCRIBED)
358                    self.put_notification(self.DISCONNECTED)
359                    self._state = self._ST_OVER
360                    return False
361
362                if msg:
363                    if self._load_history(msg):
364                        return True  # loading worked
365
366                    continue  # not loaded ... date may have been seen
367                else:
368                    # End of histdata
369                    if self.p.historical:  # only historical
370                        self.put_notification(self.DISCONNECTED)
371                        self._state = self._ST_OVER
372                        return False  # end of historical
373
374                # Live is also wished - go for it
375                self._state = self._ST_LIVE
376                continue
377
378            elif self._state == self._ST_FROM:
379                if not self.p.backfill_from.next():
380                    # additional data source is consumed
381                    self._state = self._ST_START
382                    continue
383
384                # copy lines of the same name
385                for alias in self.lines.getlinealiases():
386                    lsrc = getattr(self.p.backfill_from.lines, alias)
387                    ldst = getattr(self.lines, alias)
388
389                    ldst[0] = lsrc[0]
390
391                return True
392
393            elif self._state == self._ST_START:
394                if not self._st_start(instart=False):
395                    self._state = self._ST_OVER
396                    return False
397
398    def _load_tick(self, msg):
399        dtobj = datetime.utcfromtimestamp(int(msg['time']) / 10 ** 6)
400        dt = date2num(dtobj)
401        if dt <= self.lines.datetime[-1]:
402            return False  # time already seen
403
404        # Common fields
405        self.lines.datetime[0] = dt
406        self.lines.volume[0] = 0.0
407        self.lines.openinterest[0] = 0.0
408
409        # Put the prices into the bar
410        tick = float(msg['ask']) if self.p.useask else float(msg['bid'])
411        self.lines.open[0] = tick
412        self.lines.high[0] = tick
413        self.lines.low[0] = tick
414        self.lines.close[0] = tick
415        self.lines.volume[0] = 0.0
416        self.lines.openinterest[0] = 0.0
417
418        return True
419
420    def _load_history(self, msg):
421        dtobj = datetime.utcfromtimestamp(int(msg['time']) / 10 ** 6)
422        dt = date2num(dtobj)
423        if dt <= self.lines.datetime[-1]:
424            return False  # time already seen
425
426        # Common fields
427        self.lines.datetime[0] = dt
428        self.lines.volume[0] = float(msg['volume'])
429        self.lines.openinterest[0] = 0.0
430
431        # Put the prices into the bar
432        if self.p.bidask:
433            if not self.p.useask:
434                self.lines.open[0] = float(msg['openBid'])
435                self.lines.high[0] = float(msg['highBid'])
436                self.lines.low[0] = float(msg['lowBid'])
437                self.lines.close[0] = float(msg['closeBid'])
438            else:
439                self.lines.open[0] = float(msg['openAsk'])
440                self.lines.high[0] = float(msg['highAsk'])
441                self.lines.low[0] = float(msg['lowAsk'])
442                self.lines.close[0] = float(msg['closeAsk'])
443        else:
444            self.lines.open[0] = float(msg['openMid'])
445            self.lines.high[0] = float(msg['highMid'])
446            self.lines.low[0] = float(msg['lowMid'])
447            self.lines.close[0] = float(msg['closeMid'])
448
449        return True
450