1#!/usr/bin/env python 2# -*- coding: utf-8; py-indent-offset:4 -*- 3############################################################################### 4# 5# Copyright (C) 2015, 2016, 2017 Daniel Rodriguez 6# 7# This program is free software: you can redistribute it and/or modify 8# it under the terms of the GNU General Public License as published by 9# the Free Software Foundation, either version 3 of the License, or 10# (at your option) any later version. 11# 12# This program is distributed in the hope that it will be useful, 13# but WITHOUT ANY WARRANTY; without even the implied warranty of 14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15# GNU General Public License for more details. 16# 17# You should have received a copy of the GNU General Public License 18# along with this program. If not, see <http://www.gnu.org/licenses/>. 19# 20############################################################################### 21from __future__ import (absolute_import, division, print_function, 22 unicode_literals) 23 24import collections 25from datetime import datetime, timedelta 26import time as _time 27import json 28import threading 29 30import oandapy 31import requests # oandapy depdendency 32 33import backtrader as bt 34from backtrader.metabase import MetaParams 35from backtrader.utils.py3 import queue, with_metaclass 36from backtrader.utils import AutoDict 37 38 39# Extend the exceptions to support extra cases 40 41class OandaRequestError(oandapy.OandaError): 42 def __init__(self): 43 er = dict(code=599, message='Request Error', description='') 44 super(self.__class__, self).__init__(er) 45 46 47class OandaStreamError(oandapy.OandaError): 48 def __init__(self, content=''): 49 er = dict(code=598, message='Failed Streaming', description=content) 50 super(self.__class__, self).__init__(er) 51 52 53class OandaTimeFrameError(oandapy.OandaError): 54 def __init__(self, content): 55 er = dict(code=597, message='Not supported TimeFrame', description='') 56 super(self.__class__, self).__init__(er) 57 58 59class OandaNetworkError(oandapy.OandaError): 60 def __init__(self): 61 er = dict(code=596, message='Network Error', description='') 62 super(self.__class__, self).__init__(er) 63 64 65class API(oandapy.API): 66 def request(self, endpoint, method='GET', params=None): 67 # Overriden to make something sensible out of a 68 # request.RequestException rather than simply issuing a print(str(e)) 69 url = '%s/%s' % (self.api_url, endpoint) 70 71 method = method.lower() 72 params = params or {} 73 74 func = getattr(self.client, method) 75 76 request_args = {} 77 if method == 'get': 78 request_args['params'] = params 79 else: 80 request_args['data'] = params 81 82 # Added the try block 83 try: 84 response = func(url, **request_args) 85 except requests.RequestException as e: 86 return OandaRequestError().error_response 87 88 content = response.content.decode('utf-8') 89 content = json.loads(content) 90 91 # error message 92 if response.status_code >= 400: 93 # changed from raise to return 94 return oandapy.OandaError(content).error_response 95 96 return content 97 98 99class Streamer(oandapy.Streamer): 100 def __init__(self, q, headers=None, *args, **kwargs): 101 # Override to provide headers, which is in the standard API interface 102 super(Streamer, self).__init__(*args, **kwargs) 103 104 if headers: 105 self.client.headers.update(headers) 106 107 self.q = q 108 109 def run(self, endpoint, params=None): 110 # Override to better manage exceptions. 111 # Kept as much as possible close to the original 112 self.connected = True 113 114 params = params or {} 115 116 ignore_heartbeat = None 117 if 'ignore_heartbeat' in params: 118 ignore_heartbeat = params['ignore_heartbeat'] 119 120 request_args = {} 121 request_args['params'] = params 122 123 url = '%s/%s' % (self.api_url, endpoint) 124 125 while self.connected: 126 # Added exception control here 127 try: 128 response = self.client.get(url, **request_args) 129 except requests.RequestException as e: 130 self.q.put(OandaRequestError().error_response) 131 break 132 133 if response.status_code != 200: 134 self.on_error(response.content) 135 break # added break here 136 137 # Changed chunk_size 90 -> None 138 try: 139 for line in response.iter_lines(chunk_size=None): 140 if not self.connected: 141 break 142 143 if line: 144 data = json.loads(line.decode('utf-8')) 145 if not (ignore_heartbeat and 'heartbeat' in data): 146 self.on_success(data) 147 148 except: # socket.error has been seen 149 self.q.put(OandaStreamError().error_response) 150 break 151 152 def on_success(self, data): 153 if 'tick' in data: 154 self.q.put(data['tick']) 155 elif 'transaction' in data: 156 self.q.put(data['transaction']) 157 158 def on_error(self, data): 159 self.disconnect() 160 self.q.put(OandaStreamError(data).error_response) 161 162 163class MetaSingleton(MetaParams): 164 '''Metaclass to make a metaclassed class a singleton''' 165 def __init__(cls, name, bases, dct): 166 super(MetaSingleton, cls).__init__(name, bases, dct) 167 cls._singleton = None 168 169 def __call__(cls, *args, **kwargs): 170 if cls._singleton is None: 171 cls._singleton = ( 172 super(MetaSingleton, cls).__call__(*args, **kwargs)) 173 174 return cls._singleton 175 176 177class OandaStore(with_metaclass(MetaSingleton, object)): 178 '''Singleton class wrapping to control the connections to Oanda. 179 180 Params: 181 182 - ``token`` (default:``None``): API access token 183 184 - ``account`` (default: ``None``): account id 185 186 - ``practice`` (default: ``False``): use the test environment 187 188 - ``account_tmout`` (default: ``10.0``): refresh period for account 189 value/cash refresh 190 ''' 191 192 BrokerCls = None # broker class will autoregister 193 DataCls = None # data class will auto register 194 195 params = ( 196 ('token', ''), 197 ('account', ''), 198 ('practice', False), 199 ('account_tmout', 10.0), # account balance refresh timeout 200 ) 201 202 _DTEPOCH = datetime(1970, 1, 1) 203 _ENVPRACTICE = 'practice' 204 _ENVLIVE = 'live' 205 206 @classmethod 207 def getdata(cls, *args, **kwargs): 208 '''Returns ``DataCls`` with args, kwargs''' 209 return cls.DataCls(*args, **kwargs) 210 211 @classmethod 212 def getbroker(cls, *args, **kwargs): 213 '''Returns broker with *args, **kwargs from registered ``BrokerCls``''' 214 return cls.BrokerCls(*args, **kwargs) 215 216 def __init__(self): 217 super(OandaStore, self).__init__() 218 219 self.notifs = collections.deque() # store notifications for cerebro 220 221 self._env = None # reference to cerebro for general notifications 222 self.broker = None # broker instance 223 self.datas = list() # datas that have registered over start 224 225 self._orders = collections.OrderedDict() # map order.ref to oid 226 self._ordersrev = collections.OrderedDict() # map oid to order.ref 227 self._transpend = collections.defaultdict(collections.deque) 228 229 self._oenv = self._ENVPRACTICE if self.p.practice else self._ENVLIVE 230 self.oapi = API(environment=self._oenv, 231 access_token=self.p.token, 232 headers={'X-Accept-Datetime-Format': 'UNIX'}) 233 234 self._cash = 0.0 235 self._value = 0.0 236 self._evt_acct = threading.Event() 237 238 def start(self, data=None, broker=None): 239 # Datas require some processing to kickstart data reception 240 if data is None and broker is None: 241 self.cash = None 242 return 243 244 if data is not None: 245 self._env = data._env 246 # For datas simulate a queue with None to kickstart co 247 self.datas.append(data) 248 249 if self.broker is not None: 250 self.broker.data_started(data) 251 252 elif broker is not None: 253 self.broker = broker 254 self.streaming_events() 255 self.broker_threads() 256 257 def stop(self): 258 # signal end of thread 259 if self.broker is not None: 260 self.q_ordercreate.put(None) 261 self.q_orderclose.put(None) 262 self.q_account.put(None) 263 264 def put_notification(self, msg, *args, **kwargs): 265 self.notifs.append((msg, args, kwargs)) 266 267 def get_notifications(self): 268 '''Return the pending "store" notifications''' 269 self.notifs.append(None) # put a mark / threads could still append 270 return [x for x in iter(self.notifs.popleft, None)] 271 272 # Oanda supported granularities 273 _GRANULARITIES = { 274 (bt.TimeFrame.Seconds, 5): 'S5', 275 (bt.TimeFrame.Seconds, 10): 'S10', 276 (bt.TimeFrame.Seconds, 15): 'S15', 277 (bt.TimeFrame.Seconds, 30): 'S30', 278 (bt.TimeFrame.Minutes, 1): 'M1', 279 (bt.TimeFrame.Minutes, 2): 'M3', 280 (bt.TimeFrame.Minutes, 3): 'M3', 281 (bt.TimeFrame.Minutes, 4): 'M4', 282 (bt.TimeFrame.Minutes, 5): 'M5', 283 (bt.TimeFrame.Minutes, 10): 'M5', 284 (bt.TimeFrame.Minutes, 15): 'M5', 285 (bt.TimeFrame.Minutes, 30): 'M5', 286 (bt.TimeFrame.Minutes, 60): 'H1', 287 (bt.TimeFrame.Minutes, 120): 'H2', 288 (bt.TimeFrame.Minutes, 180): 'H3', 289 (bt.TimeFrame.Minutes, 240): 'H4', 290 (bt.TimeFrame.Minutes, 360): 'H6', 291 (bt.TimeFrame.Minutes, 480): 'H8', 292 (bt.TimeFrame.Days, 1): 'D', 293 (bt.TimeFrame.Weeks, 1): 'W', 294 (bt.TimeFrame.Months, 1): 'M', 295 } 296 297 def get_positions(self): 298 try: 299 positions = self.oapi.get_positions(self.p.account) 300 except (oandapy.OandaError, OandaRequestError,): 301 return None 302 303 poslist = positions.get('positions', []) 304 return poslist 305 306 def get_granularity(self, timeframe, compression): 307 return self._GRANULARITIES.get((timeframe, compression), None) 308 309 def get_instrument(self, dataname): 310 try: 311 insts = self.oapi.get_instruments(self.p.account, 312 instruments=dataname) 313 except (oandapy.OandaError, OandaRequestError,): 314 return None 315 316 i = insts.get('instruments', [{}]) 317 return i[0] or None 318 319 def streaming_events(self, tmout=None): 320 q = queue.Queue() 321 kwargs = {'q': q, 'tmout': tmout} 322 323 t = threading.Thread(target=self._t_streaming_listener, kwargs=kwargs) 324 t.daemon = True 325 t.start() 326 327 t = threading.Thread(target=self._t_streaming_events, kwargs=kwargs) 328 t.daemon = True 329 t.start() 330 return q 331 332 def _t_streaming_listener(self, q, tmout=None): 333 while True: 334 trans = q.get() 335 self._transaction(trans) 336 337 def _t_streaming_events(self, q, tmout=None): 338 if tmout is not None: 339 _time.sleep(tmout) 340 341 streamer = Streamer(q, 342 environment=self._oenv, 343 access_token=self.p.token, 344 headers={'X-Accept-Datetime-Format': 'UNIX'}) 345 346 streamer.events(ignore_heartbeat=False) 347 348 def candles(self, dataname, dtbegin, dtend, timeframe, compression, 349 candleFormat, includeFirst): 350 351 kwargs = locals().copy() 352 kwargs.pop('self') 353 kwargs['q'] = q = queue.Queue() 354 t = threading.Thread(target=self._t_candles, kwargs=kwargs) 355 t.daemon = True 356 t.start() 357 return q 358 359 def _t_candles(self, dataname, dtbegin, dtend, timeframe, compression, 360 candleFormat, includeFirst, q): 361 362 granularity = self.get_granularity(timeframe, compression) 363 if granularity is None: 364 e = OandaTimeFrameError() 365 q.put(e.error_response) 366 return 367 368 dtkwargs = {} 369 if dtbegin is not None: 370 dtkwargs['start'] = int((dtbegin - self._DTEPOCH).total_seconds()) 371 372 if dtend is not None: 373 dtkwargs['end'] = int((dtend - self._DTEPOCH).total_seconds()) 374 375 try: 376 response = self.oapi.get_history(instrument=dataname, 377 granularity=granularity, 378 candleFormat=candleFormat, 379 **dtkwargs) 380 381 except oandapy.OandaError as e: 382 q.put(e.error_response) 383 q.put(None) 384 return 385 386 for candle in response.get('candles', []): 387 q.put(candle) 388 389 q.put({}) # end of transmission 390 391 def streaming_prices(self, dataname, tmout=None): 392 q = queue.Queue() 393 kwargs = {'q': q, 'dataname': dataname, 'tmout': tmout} 394 t = threading.Thread(target=self._t_streaming_prices, kwargs=kwargs) 395 t.daemon = True 396 t.start() 397 return q 398 399 def _t_streaming_prices(self, dataname, q, tmout): 400 if tmout is not None: 401 _time.sleep(tmout) 402 403 streamer = Streamer(q, environment=self._oenv, 404 access_token=self.p.token, 405 headers={'X-Accept-Datetime-Format': 'UNIX'}) 406 407 streamer.rates(self.p.account, instruments=dataname) 408 409 def get_cash(self): 410 return self._cash 411 412 def get_value(self): 413 return self._value 414 415 _ORDEREXECS = { 416 bt.Order.Market: 'market', 417 bt.Order.Limit: 'limit', 418 bt.Order.Stop: 'stop', 419 bt.Order.StopLimit: 'stop', 420 } 421 422 def broker_threads(self): 423 self.q_account = queue.Queue() 424 self.q_account.put(True) # force an immediate update 425 t = threading.Thread(target=self._t_account) 426 t.daemon = True 427 t.start() 428 429 self.q_ordercreate = queue.Queue() 430 t = threading.Thread(target=self._t_order_create) 431 t.daemon = True 432 t.start() 433 434 self.q_orderclose = queue.Queue() 435 t = threading.Thread(target=self._t_order_cancel) 436 t.daemon = True 437 t.start() 438 439 # Wait once for the values to be set 440 self._evt_acct.wait(self.p.account_tmout) 441 442 def _t_account(self): 443 while True: 444 try: 445 msg = self.q_account.get(timeout=self.p.account_tmout) 446 if msg is None: 447 break # end of thread 448 except queue.Empty: # tmout -> time to refresh 449 pass 450 451 try: 452 accinfo = self.oapi.get_account(self.p.account) 453 except Exception as e: 454 self.put_notification(e) 455 continue 456 457 try: 458 self._cash = accinfo['marginAvail'] 459 self._value = accinfo['balance'] 460 except KeyError: 461 pass 462 463 self._evt_acct.set() 464 465 def order_create(self, order, stopside=None, takeside=None, **kwargs): 466 okwargs = dict() 467 okwargs['instrument'] = order.data._dataname 468 okwargs['units'] = abs(order.created.size) 469 okwargs['side'] = 'buy' if order.isbuy() else 'sell' 470 okwargs['type'] = self._ORDEREXECS[order.exectype] 471 if order.exectype != bt.Order.Market: 472 okwargs['price'] = order.created.price 473 if order.valid is None: 474 # 1 year and datetime.max fail ... 1 month works 475 valid = datetime.utcnow() + timedelta(days=30) 476 else: 477 valid = order.data.num2date(order.valid) 478 # To timestamp with seconds precision 479 okwargs['expiry'] = int((valid - self._DTEPOCH).total_seconds()) 480 481 if order.exectype == bt.Order.StopLimit: 482 okwargs['lowerBound'] = order.created.pricelimit 483 okwargs['upperBound'] = order.created.pricelimit 484 485 if order.exectype == bt.Order.StopTrail: 486 okwargs['trailingStop'] = order.trailamount 487 488 if stopside is not None: 489 okwargs['stopLoss'] = stopside.price 490 491 if takeside is not None: 492 okwargs['takeProfit'] = takeside.price 493 494 okwargs.update(**kwargs) # anything from the user 495 496 self.q_ordercreate.put((order.ref, okwargs,)) 497 return order 498 499 _OIDSINGLE = ['orderOpened', 'tradeOpened', 'tradeReduced'] 500 _OIDMULTIPLE = ['tradesClosed'] 501 502 def _t_order_create(self): 503 while True: 504 msg = self.q_ordercreate.get() 505 if msg is None: 506 break 507 508 oref, okwargs = msg 509 try: 510 o = self.oapi.create_order(self.p.account, **okwargs) 511 except Exception as e: 512 self.put_notification(e) 513 self.broker._reject(oref) 514 return 515 516 # Ids are delivered in different fields and all must be fetched to 517 # match them (as executions) to the order generated here 518 oids = list() 519 for oidfield in self._OIDSINGLE: 520 if oidfield in o and 'id' in o[oidfield]: 521 oids.append(o[oidfield]['id']) 522 523 for oidfield in self._OIDMULTIPLE: 524 if oidfield in o: 525 for suboidfield in o[oidfield]: 526 oids.append(suboidfield['id']) 527 528 if not oids: 529 self.broker._reject(oref) 530 return 531 532 self._orders[oref] = oids[0] 533 self.broker._submit(oref) 534 if okwargs['type'] == 'market': 535 self.broker._accept(oref) # taken immediately 536 537 for oid in oids: 538 self._ordersrev[oid] = oref # maps ids to backtrader order 539 540 # An transaction may have happened and was stored 541 tpending = self._transpend[oid] 542 tpending.append(None) # eom marker 543 while True: 544 trans = tpending.popleft() 545 if trans is None: 546 break 547 self._process_transaction(oid, trans) 548 549 def order_cancel(self, order): 550 self.q_orderclose.put(order.ref) 551 return order 552 553 def _t_order_cancel(self): 554 while True: 555 oref = self.q_orderclose.get() 556 if oref is None: 557 break 558 559 oid = self._orders.get(oref, None) 560 if oid is None: 561 continue # the order is no longer there 562 try: 563 o = self.oapi.close_order(self.p.account, oid) 564 except Exception as e: 565 continue # not cancelled - FIXME: notify 566 567 self.broker._cancel(oref) 568 569 _X_ORDER_CREATE = ('STOP_ORDER_CREATE', 570 'LIMIT_ORDER_CREATE', 'MARKET_IF_TOUCHED_ORDER_CREATE',) 571 572 def _transaction(self, trans): 573 # Invoked from Streaming Events. May actually receive an event for an 574 # oid which has not yet been returned after creating an order. Hence 575 # store if not yet seen, else forward to processer 576 ttype = trans['type'] 577 if ttype == 'MARKET_ORDER_CREATE': 578 try: 579 oid = trans['tradeReduced']['id'] 580 except KeyError: 581 try: 582 oid = trans['tradeOpened']['id'] 583 except KeyError: 584 return # cannot do anything else 585 586 elif ttype in self._X_ORDER_CREATE: 587 oid = trans['id'] 588 elif ttype == 'ORDER_FILLED': 589 oid = trans['orderId'] 590 591 elif ttype == 'ORDER_CANCEL': 592 oid = trans['orderId'] 593 594 elif ttype == 'TRADE_CLOSE': 595 oid = trans['id'] 596 pid = trans['tradeId'] 597 if pid in self._orders and False: # Know nothing about trade 598 return # can do nothing 599 600 # Skip above - at the moment do nothing 601 # Received directly from an event in the WebGUI for example which 602 # closes an existing position related to order with id -> pid 603 # COULD BE DONE: Generate a fake counter order to gracefully 604 # close the existing position 605 msg = ('Received TRADE_CLOSE for unknown order, possibly generated' 606 ' over a different client or GUI') 607 self.put_notification(msg, trans) 608 return 609 610 else: # Go aways gracefully 611 try: 612 oid = trans['id'] 613 except KeyError: 614 oid = 'None' 615 616 msg = 'Received {} with oid {}. Unknown situation' 617 msg = msg.format(ttype, oid) 618 self.put_notification(msg, trans) 619 return 620 621 try: 622 oref = self._ordersrev[oid] 623 self._process_transaction(oid, trans) 624 except KeyError: # not yet seen, keep as pending 625 self._transpend[oid].append(trans) 626 627 _X_ORDER_FILLED = ('MARKET_ORDER_CREATE', 628 'ORDER_FILLED', 'TAKE_PROFIT_FILLED', 629 'STOP_LOSS_FILLED', 'TRAILING_STOP_FILLED',) 630 631 def _process_transaction(self, oid, trans): 632 try: 633 oref = self._ordersrev.pop(oid) 634 except KeyError: 635 return 636 637 ttype = trans['type'] 638 639 if ttype in self._X_ORDER_FILLED: 640 size = trans['units'] 641 if trans['side'] == 'sell': 642 size = -size 643 price = trans['price'] 644 self.broker._fill(oref, size, price, ttype=ttype) 645 646 elif ttype in self._X_ORDER_CREATE: 647 self.broker._accept(oref) 648 self._ordersrev[oid] = oref 649 650 elif ttype in 'ORDER_CANCEL': 651 reason = trans['reason'] 652 if reason == 'ORDER_FILLED': 653 pass # individual execs have done the job 654 elif reason == 'TIME_IN_FORCE_EXPIRED': 655 self.broker._expire(oref) 656 elif reason == 'CLIENT_REQUEST': 657 self.broker._cancel(oref) 658 else: # default action ... if nothing else 659 self.broker._reject(oref) 660