1#!/usr/bin/env python
2# -*- coding: utf-8; py-indent-offset:4 -*-
3###############################################################################
4#
5# Copyright (C) 2015, 2016, 2017 Daniel Rodriguez
6#
7# This program is free software: you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation, either version 3 of the License, or
10# (at your option) any later version.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program.  If not, see <http://www.gnu.org/licenses/>.
19#
20###############################################################################
21from __future__ import (absolute_import, division, print_function,
22                        unicode_literals)
23
24import collections
25from datetime import datetime, timedelta
26import time as _time
27import json
28import threading
29
30import oandapy
31import requests  # oandapy depdendency
32
33import backtrader as bt
34from backtrader.metabase import MetaParams
35from backtrader.utils.py3 import queue, with_metaclass
36from backtrader.utils import AutoDict
37
38
39# Extend the exceptions to support extra cases
40
41class OandaRequestError(oandapy.OandaError):
42    def __init__(self):
43        er = dict(code=599, message='Request Error', description='')
44        super(self.__class__, self).__init__(er)
45
46
47class OandaStreamError(oandapy.OandaError):
48    def __init__(self, content=''):
49        er = dict(code=598, message='Failed Streaming', description=content)
50        super(self.__class__, self).__init__(er)
51
52
53class OandaTimeFrameError(oandapy.OandaError):
54    def __init__(self, content):
55        er = dict(code=597, message='Not supported TimeFrame', description='')
56        super(self.__class__, self).__init__(er)
57
58
59class OandaNetworkError(oandapy.OandaError):
60    def __init__(self):
61        er = dict(code=596, message='Network Error', description='')
62        super(self.__class__, self).__init__(er)
63
64
65class API(oandapy.API):
66    def request(self, endpoint, method='GET', params=None):
67        # Overriden to make something sensible out of a
68        # request.RequestException rather than simply issuing a print(str(e))
69        url = '%s/%s' % (self.api_url, endpoint)
70
71        method = method.lower()
72        params = params or {}
73
74        func = getattr(self.client, method)
75
76        request_args = {}
77        if method == 'get':
78            request_args['params'] = params
79        else:
80            request_args['data'] = params
81
82        # Added the try block
83        try:
84            response = func(url, **request_args)
85        except requests.RequestException as e:
86            return OandaRequestError().error_response
87
88        content = response.content.decode('utf-8')
89        content = json.loads(content)
90
91        # error message
92        if response.status_code >= 400:
93            # changed from raise to return
94            return oandapy.OandaError(content).error_response
95
96        return content
97
98
99class Streamer(oandapy.Streamer):
100    def __init__(self, q, headers=None, *args, **kwargs):
101        # Override to provide headers, which is in the standard API interface
102        super(Streamer, self).__init__(*args, **kwargs)
103
104        if headers:
105            self.client.headers.update(headers)
106
107        self.q = q
108
109    def run(self, endpoint, params=None):
110        # Override to better manage exceptions.
111        # Kept as much as possible close to the original
112        self.connected = True
113
114        params = params or {}
115
116        ignore_heartbeat = None
117        if 'ignore_heartbeat' in params:
118            ignore_heartbeat = params['ignore_heartbeat']
119
120        request_args = {}
121        request_args['params'] = params
122
123        url = '%s/%s' % (self.api_url, endpoint)
124
125        while self.connected:
126            # Added exception control here
127            try:
128                response = self.client.get(url, **request_args)
129            except requests.RequestException as e:
130                self.q.put(OandaRequestError().error_response)
131                break
132
133            if response.status_code != 200:
134                self.on_error(response.content)
135                break  # added break here
136
137            # Changed chunk_size 90 -> None
138            try:
139                for line in response.iter_lines(chunk_size=None):
140                    if not self.connected:
141                        break
142
143                    if line:
144                        data = json.loads(line.decode('utf-8'))
145                        if not (ignore_heartbeat and 'heartbeat' in data):
146                            self.on_success(data)
147
148            except:  # socket.error has been seen
149                self.q.put(OandaStreamError().error_response)
150                break
151
152    def on_success(self, data):
153        if 'tick' in data:
154            self.q.put(data['tick'])
155        elif 'transaction' in data:
156            self.q.put(data['transaction'])
157
158    def on_error(self, data):
159        self.disconnect()
160        self.q.put(OandaStreamError(data).error_response)
161
162
163class MetaSingleton(MetaParams):
164    '''Metaclass to make a metaclassed class a singleton'''
165    def __init__(cls, name, bases, dct):
166        super(MetaSingleton, cls).__init__(name, bases, dct)
167        cls._singleton = None
168
169    def __call__(cls, *args, **kwargs):
170        if cls._singleton is None:
171            cls._singleton = (
172                super(MetaSingleton, cls).__call__(*args, **kwargs))
173
174        return cls._singleton
175
176
177class OandaStore(with_metaclass(MetaSingleton, object)):
178    '''Singleton class wrapping to control the connections to Oanda.
179
180    Params:
181
182      - ``token`` (default:``None``): API access token
183
184      - ``account`` (default: ``None``): account id
185
186      - ``practice`` (default: ``False``): use the test environment
187
188      - ``account_tmout`` (default: ``10.0``): refresh period for account
189        value/cash refresh
190    '''
191
192    BrokerCls = None  # broker class will autoregister
193    DataCls = None  # data class will auto register
194
195    params = (
196        ('token', ''),
197        ('account', ''),
198        ('practice', False),
199        ('account_tmout', 10.0),  # account balance refresh timeout
200    )
201
202    _DTEPOCH = datetime(1970, 1, 1)
203    _ENVPRACTICE = 'practice'
204    _ENVLIVE = 'live'
205
206    @classmethod
207    def getdata(cls, *args, **kwargs):
208        '''Returns ``DataCls`` with args, kwargs'''
209        return cls.DataCls(*args, **kwargs)
210
211    @classmethod
212    def getbroker(cls, *args, **kwargs):
213        '''Returns broker with *args, **kwargs from registered ``BrokerCls``'''
214        return cls.BrokerCls(*args, **kwargs)
215
216    def __init__(self):
217        super(OandaStore, self).__init__()
218
219        self.notifs = collections.deque()  # store notifications for cerebro
220
221        self._env = None  # reference to cerebro for general notifications
222        self.broker = None  # broker instance
223        self.datas = list()  # datas that have registered over start
224
225        self._orders = collections.OrderedDict()  # map order.ref to oid
226        self._ordersrev = collections.OrderedDict()  # map oid to order.ref
227        self._transpend = collections.defaultdict(collections.deque)
228
229        self._oenv = self._ENVPRACTICE if self.p.practice else self._ENVLIVE
230        self.oapi = API(environment=self._oenv,
231                        access_token=self.p.token,
232                        headers={'X-Accept-Datetime-Format': 'UNIX'})
233
234        self._cash = 0.0
235        self._value = 0.0
236        self._evt_acct = threading.Event()
237
238    def start(self, data=None, broker=None):
239        # Datas require some processing to kickstart data reception
240        if data is None and broker is None:
241            self.cash = None
242            return
243
244        if data is not None:
245            self._env = data._env
246            # For datas simulate a queue with None to kickstart co
247            self.datas.append(data)
248
249            if self.broker is not None:
250                self.broker.data_started(data)
251
252        elif broker is not None:
253            self.broker = broker
254            self.streaming_events()
255            self.broker_threads()
256
257    def stop(self):
258        # signal end of thread
259        if self.broker is not None:
260            self.q_ordercreate.put(None)
261            self.q_orderclose.put(None)
262            self.q_account.put(None)
263
264    def put_notification(self, msg, *args, **kwargs):
265        self.notifs.append((msg, args, kwargs))
266
267    def get_notifications(self):
268        '''Return the pending "store" notifications'''
269        self.notifs.append(None)  # put a mark / threads could still append
270        return [x for x in iter(self.notifs.popleft, None)]
271
272    # Oanda supported granularities
273    _GRANULARITIES = {
274        (bt.TimeFrame.Seconds, 5): 'S5',
275        (bt.TimeFrame.Seconds, 10): 'S10',
276        (bt.TimeFrame.Seconds, 15): 'S15',
277        (bt.TimeFrame.Seconds, 30): 'S30',
278        (bt.TimeFrame.Minutes, 1): 'M1',
279        (bt.TimeFrame.Minutes, 2): 'M3',
280        (bt.TimeFrame.Minutes, 3): 'M3',
281        (bt.TimeFrame.Minutes, 4): 'M4',
282        (bt.TimeFrame.Minutes, 5): 'M5',
283        (bt.TimeFrame.Minutes, 10): 'M5',
284        (bt.TimeFrame.Minutes, 15): 'M5',
285        (bt.TimeFrame.Minutes, 30): 'M5',
286        (bt.TimeFrame.Minutes, 60): 'H1',
287        (bt.TimeFrame.Minutes, 120): 'H2',
288        (bt.TimeFrame.Minutes, 180): 'H3',
289        (bt.TimeFrame.Minutes, 240): 'H4',
290        (bt.TimeFrame.Minutes, 360): 'H6',
291        (bt.TimeFrame.Minutes, 480): 'H8',
292        (bt.TimeFrame.Days, 1): 'D',
293        (bt.TimeFrame.Weeks, 1): 'W',
294        (bt.TimeFrame.Months, 1): 'M',
295    }
296
297    def get_positions(self):
298        try:
299            positions = self.oapi.get_positions(self.p.account)
300        except (oandapy.OandaError, OandaRequestError,):
301            return None
302
303        poslist = positions.get('positions', [])
304        return poslist
305
306    def get_granularity(self, timeframe, compression):
307        return self._GRANULARITIES.get((timeframe, compression), None)
308
309    def get_instrument(self, dataname):
310        try:
311            insts = self.oapi.get_instruments(self.p.account,
312                                              instruments=dataname)
313        except (oandapy.OandaError, OandaRequestError,):
314            return None
315
316        i = insts.get('instruments', [{}])
317        return i[0] or None
318
319    def streaming_events(self, tmout=None):
320        q = queue.Queue()
321        kwargs = {'q': q, 'tmout': tmout}
322
323        t = threading.Thread(target=self._t_streaming_listener, kwargs=kwargs)
324        t.daemon = True
325        t.start()
326
327        t = threading.Thread(target=self._t_streaming_events, kwargs=kwargs)
328        t.daemon = True
329        t.start()
330        return q
331
332    def _t_streaming_listener(self, q, tmout=None):
333        while True:
334            trans = q.get()
335            self._transaction(trans)
336
337    def _t_streaming_events(self, q, tmout=None):
338        if tmout is not None:
339            _time.sleep(tmout)
340
341        streamer = Streamer(q,
342                            environment=self._oenv,
343                            access_token=self.p.token,
344                            headers={'X-Accept-Datetime-Format': 'UNIX'})
345
346        streamer.events(ignore_heartbeat=False)
347
348    def candles(self, dataname, dtbegin, dtend, timeframe, compression,
349                candleFormat, includeFirst):
350
351        kwargs = locals().copy()
352        kwargs.pop('self')
353        kwargs['q'] = q = queue.Queue()
354        t = threading.Thread(target=self._t_candles, kwargs=kwargs)
355        t.daemon = True
356        t.start()
357        return q
358
359    def _t_candles(self, dataname, dtbegin, dtend, timeframe, compression,
360                   candleFormat, includeFirst, q):
361
362        granularity = self.get_granularity(timeframe, compression)
363        if granularity is None:
364            e = OandaTimeFrameError()
365            q.put(e.error_response)
366            return
367
368        dtkwargs = {}
369        if dtbegin is not None:
370            dtkwargs['start'] = int((dtbegin - self._DTEPOCH).total_seconds())
371
372        if dtend is not None:
373            dtkwargs['end'] = int((dtend - self._DTEPOCH).total_seconds())
374
375        try:
376            response = self.oapi.get_history(instrument=dataname,
377                                             granularity=granularity,
378                                             candleFormat=candleFormat,
379                                             **dtkwargs)
380
381        except oandapy.OandaError as e:
382            q.put(e.error_response)
383            q.put(None)
384            return
385
386        for candle in response.get('candles', []):
387            q.put(candle)
388
389        q.put({})  # end of transmission
390
391    def streaming_prices(self, dataname, tmout=None):
392        q = queue.Queue()
393        kwargs = {'q': q, 'dataname': dataname, 'tmout': tmout}
394        t = threading.Thread(target=self._t_streaming_prices, kwargs=kwargs)
395        t.daemon = True
396        t.start()
397        return q
398
399    def _t_streaming_prices(self, dataname, q, tmout):
400        if tmout is not None:
401            _time.sleep(tmout)
402
403        streamer = Streamer(q, environment=self._oenv,
404                            access_token=self.p.token,
405                            headers={'X-Accept-Datetime-Format': 'UNIX'})
406
407        streamer.rates(self.p.account, instruments=dataname)
408
409    def get_cash(self):
410        return self._cash
411
412    def get_value(self):
413        return self._value
414
415    _ORDEREXECS = {
416        bt.Order.Market: 'market',
417        bt.Order.Limit: 'limit',
418        bt.Order.Stop: 'stop',
419        bt.Order.StopLimit: 'stop',
420    }
421
422    def broker_threads(self):
423        self.q_account = queue.Queue()
424        self.q_account.put(True)  # force an immediate update
425        t = threading.Thread(target=self._t_account)
426        t.daemon = True
427        t.start()
428
429        self.q_ordercreate = queue.Queue()
430        t = threading.Thread(target=self._t_order_create)
431        t.daemon = True
432        t.start()
433
434        self.q_orderclose = queue.Queue()
435        t = threading.Thread(target=self._t_order_cancel)
436        t.daemon = True
437        t.start()
438
439        # Wait once for the values to be set
440        self._evt_acct.wait(self.p.account_tmout)
441
442    def _t_account(self):
443        while True:
444            try:
445                msg = self.q_account.get(timeout=self.p.account_tmout)
446                if msg is None:
447                    break  # end of thread
448            except queue.Empty:  # tmout -> time to refresh
449                pass
450
451            try:
452                accinfo = self.oapi.get_account(self.p.account)
453            except Exception as e:
454                self.put_notification(e)
455                continue
456
457            try:
458                self._cash = accinfo['marginAvail']
459                self._value = accinfo['balance']
460            except KeyError:
461                pass
462
463            self._evt_acct.set()
464
465    def order_create(self, order, stopside=None, takeside=None, **kwargs):
466        okwargs = dict()
467        okwargs['instrument'] = order.data._dataname
468        okwargs['units'] = abs(order.created.size)
469        okwargs['side'] = 'buy' if order.isbuy() else 'sell'
470        okwargs['type'] = self._ORDEREXECS[order.exectype]
471        if order.exectype != bt.Order.Market:
472            okwargs['price'] = order.created.price
473            if order.valid is None:
474                # 1 year and datetime.max fail ... 1 month works
475                valid = datetime.utcnow() + timedelta(days=30)
476            else:
477                valid = order.data.num2date(order.valid)
478                # To timestamp with seconds precision
479            okwargs['expiry'] = int((valid - self._DTEPOCH).total_seconds())
480
481        if order.exectype == bt.Order.StopLimit:
482            okwargs['lowerBound'] = order.created.pricelimit
483            okwargs['upperBound'] = order.created.pricelimit
484
485        if order.exectype == bt.Order.StopTrail:
486            okwargs['trailingStop'] = order.trailamount
487
488        if stopside is not None:
489            okwargs['stopLoss'] = stopside.price
490
491        if takeside is not None:
492            okwargs['takeProfit'] = takeside.price
493
494        okwargs.update(**kwargs)  # anything from the user
495
496        self.q_ordercreate.put((order.ref, okwargs,))
497        return order
498
499    _OIDSINGLE = ['orderOpened', 'tradeOpened', 'tradeReduced']
500    _OIDMULTIPLE = ['tradesClosed']
501
502    def _t_order_create(self):
503        while True:
504            msg = self.q_ordercreate.get()
505            if msg is None:
506                break
507
508            oref, okwargs = msg
509            try:
510                o = self.oapi.create_order(self.p.account, **okwargs)
511            except Exception as e:
512                self.put_notification(e)
513                self.broker._reject(oref)
514                return
515
516            # Ids are delivered in different fields and all must be fetched to
517            # match them (as executions) to the order generated here
518            oids = list()
519            for oidfield in self._OIDSINGLE:
520                if oidfield in o and 'id' in o[oidfield]:
521                    oids.append(o[oidfield]['id'])
522
523            for oidfield in self._OIDMULTIPLE:
524                if oidfield in o:
525                    for suboidfield in o[oidfield]:
526                        oids.append(suboidfield['id'])
527
528            if not oids:
529                self.broker._reject(oref)
530                return
531
532            self._orders[oref] = oids[0]
533            self.broker._submit(oref)
534            if okwargs['type'] == 'market':
535                self.broker._accept(oref)  # taken immediately
536
537            for oid in oids:
538                self._ordersrev[oid] = oref  # maps ids to backtrader order
539
540                # An transaction may have happened and was stored
541                tpending = self._transpend[oid]
542                tpending.append(None)  # eom marker
543                while True:
544                    trans = tpending.popleft()
545                    if trans is None:
546                        break
547                    self._process_transaction(oid, trans)
548
549    def order_cancel(self, order):
550        self.q_orderclose.put(order.ref)
551        return order
552
553    def _t_order_cancel(self):
554        while True:
555            oref = self.q_orderclose.get()
556            if oref is None:
557                break
558
559            oid = self._orders.get(oref, None)
560            if oid is None:
561                continue  # the order is no longer there
562            try:
563                o = self.oapi.close_order(self.p.account, oid)
564            except Exception as e:
565                continue  # not cancelled - FIXME: notify
566
567            self.broker._cancel(oref)
568
569    _X_ORDER_CREATE = ('STOP_ORDER_CREATE',
570                       'LIMIT_ORDER_CREATE', 'MARKET_IF_TOUCHED_ORDER_CREATE',)
571
572    def _transaction(self, trans):
573        # Invoked from Streaming Events. May actually receive an event for an
574        # oid which has not yet been returned after creating an order. Hence
575        # store if not yet seen, else forward to processer
576        ttype = trans['type']
577        if ttype == 'MARKET_ORDER_CREATE':
578            try:
579                oid = trans['tradeReduced']['id']
580            except KeyError:
581                try:
582                    oid = trans['tradeOpened']['id']
583                except KeyError:
584                    return  # cannot do anything else
585
586        elif ttype in self._X_ORDER_CREATE:
587            oid = trans['id']
588        elif ttype == 'ORDER_FILLED':
589            oid = trans['orderId']
590
591        elif ttype == 'ORDER_CANCEL':
592            oid = trans['orderId']
593
594        elif ttype == 'TRADE_CLOSE':
595            oid = trans['id']
596            pid = trans['tradeId']
597            if pid in self._orders and False:  # Know nothing about trade
598                return  # can do nothing
599
600            # Skip above - at the moment do nothing
601            # Received directly from an event in the WebGUI for example which
602            # closes an existing position related to order with id -> pid
603            # COULD BE DONE: Generate a fake counter order to gracefully
604            # close the existing position
605            msg = ('Received TRADE_CLOSE for unknown order, possibly generated'
606                   ' over a different client or GUI')
607            self.put_notification(msg, trans)
608            return
609
610        else:  # Go aways gracefully
611            try:
612                oid = trans['id']
613            except KeyError:
614                oid = 'None'
615
616            msg = 'Received {} with oid {}. Unknown situation'
617            msg = msg.format(ttype, oid)
618            self.put_notification(msg, trans)
619            return
620
621        try:
622            oref = self._ordersrev[oid]
623            self._process_transaction(oid, trans)
624        except KeyError:  # not yet seen, keep as pending
625            self._transpend[oid].append(trans)
626
627    _X_ORDER_FILLED = ('MARKET_ORDER_CREATE',
628                       'ORDER_FILLED', 'TAKE_PROFIT_FILLED',
629                       'STOP_LOSS_FILLED', 'TRAILING_STOP_FILLED',)
630
631    def _process_transaction(self, oid, trans):
632        try:
633            oref = self._ordersrev.pop(oid)
634        except KeyError:
635            return
636
637        ttype = trans['type']
638
639        if ttype in self._X_ORDER_FILLED:
640            size = trans['units']
641            if trans['side'] == 'sell':
642                size = -size
643            price = trans['price']
644            self.broker._fill(oref, size, price, ttype=ttype)
645
646        elif ttype in self._X_ORDER_CREATE:
647            self.broker._accept(oref)
648            self._ordersrev[oid] = oref
649
650        elif ttype in 'ORDER_CANCEL':
651            reason = trans['reason']
652            if reason == 'ORDER_FILLED':
653                pass  # individual execs have done the job
654            elif reason == 'TIME_IN_FORCE_EXPIRED':
655                self.broker._expire(oref)
656            elif reason == 'CLIENT_REQUEST':
657                self.broker._cancel(oref)
658            else:  # default action ... if nothing else
659                self.broker._reject(oref)
660