1#!/usr/bin/env python 2# -*- coding: utf-8; py-indent-offset:4 -*- 3############################################################################### 4# 5# Copyright (C) 2015, 2016, 2017 Daniel Rodriguez 6# 7# This program is free software: you can redistribute it and/or modify 8# it under the terms of the GNU General Public License as published by 9# the Free Software Foundation, either version 3 of the License, or 10# (at your option) any later version. 11# 12# This program is distributed in the hope that it will be useful, 13# but WITHOUT ANY WARRANTY; without even the implied warranty of 14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15# GNU General Public License for more details. 16# 17# You should have received a copy of the GNU General Public License 18# along with this program. If not, see <http://www.gnu.org/licenses/>. 19# 20############################################################################### 21from __future__ import (absolute_import, division, print_function, 22 unicode_literals) 23 24import collections 25import datetime 26import inspect 27import io 28import os.path 29 30import backtrader as bt 31from backtrader import (date2num, num2date, time2num, TimeFrame, dataseries, 32 metabase) 33 34from backtrader.utils.py3 import with_metaclass, zip, range, string_types 35from backtrader.utils import tzparse 36from .dataseries import SimpleFilterWrapper 37from .resamplerfilter import Resampler, Replayer 38from .tradingcal import PandasMarketCalendar 39 40 41class MetaAbstractDataBase(dataseries.OHLCDateTime.__class__): 42 _indcol = dict() 43 44 def __init__(cls, name, bases, dct): 45 ''' 46 Class has already been created ... register subclasses 47 ''' 48 # Initialize the class 49 super(MetaAbstractDataBase, cls).__init__(name, bases, dct) 50 51 if not cls.aliased and \ 52 name != 'DataBase' and not name.startswith('_'): 53 cls._indcol[name] = cls 54 55 def dopreinit(cls, _obj, *args, **kwargs): 56 _obj, args, kwargs = \ 57 super(MetaAbstractDataBase, cls).dopreinit(_obj, *args, **kwargs) 58 59 # Find the owner and store it 60 _obj._feed = metabase.findowner(_obj, FeedBase) 61 62 _obj.notifs = collections.deque() # store notifications for cerebro 63 64 _obj._dataname = _obj.p.dataname 65 _obj._name = '' 66 return _obj, args, kwargs 67 68 def dopostinit(cls, _obj, *args, **kwargs): 69 _obj, args, kwargs = \ 70 super(MetaAbstractDataBase, cls).dopostinit(_obj, *args, **kwargs) 71 72 # Either set by subclass or the parameter or use the dataname (ticker) 73 _obj._name = _obj._name or _obj.p.name 74 if not _obj._name and isinstance(_obj.p.dataname, string_types): 75 _obj._name = _obj.p.dataname 76 _obj._compression = _obj.p.compression 77 _obj._timeframe = _obj.p.timeframe 78 79 if isinstance(_obj.p.sessionstart, datetime.datetime): 80 _obj.p.sessionstart = _obj.p.sessionstart.time() 81 82 elif _obj.p.sessionstart is None: 83 _obj.p.sessionstart = datetime.time.min 84 85 if isinstance(_obj.p.sessionend, datetime.datetime): 86 _obj.p.sessionend = _obj.p.sessionend.time() 87 88 elif _obj.p.sessionend is None: 89 # remove 9 to avoid precision rounding errors 90 _obj.p.sessionend = datetime.time(23, 59, 59, 999990) 91 92 if isinstance(_obj.p.fromdate, datetime.date): 93 # push it to the end of the day, or else intraday 94 # values before the end of the day would be gone 95 if not hasattr(_obj.p.fromdate, 'hour'): 96 _obj.p.fromdate = datetime.datetime.combine( 97 _obj.p.fromdate, _obj.p.sessionstart) 98 99 if isinstance(_obj.p.todate, datetime.date): 100 # push it to the end of the day, or else intraday 101 # values before the end of the day would be gone 102 if not hasattr(_obj.p.todate, 'hour'): 103 _obj.p.todate = datetime.datetime.combine( 104 _obj.p.todate, _obj.p.sessionend) 105 106 _obj._barstack = collections.deque() # for filter operations 107 _obj._barstash = collections.deque() # for filter operations 108 109 _obj._filters = list() 110 _obj._ffilters = list() 111 for fp in _obj.p.filters: 112 if inspect.isclass(fp): 113 fp = fp(_obj) 114 if hasattr(fp, 'last'): 115 _obj._ffilters.append((fp, [], {})) 116 117 _obj._filters.append((fp, [], {})) 118 119 return _obj, args, kwargs 120 121 122class AbstractDataBase(with_metaclass(MetaAbstractDataBase, 123 dataseries.OHLCDateTime)): 124 125 params = ( 126 ('dataname', None), 127 ('name', ''), 128 ('compression', 1), 129 ('timeframe', TimeFrame.Days), 130 ('fromdate', None), 131 ('todate', None), 132 ('sessionstart', None), 133 ('sessionend', None), 134 ('filters', []), 135 ('tz', None), 136 ('tzinput', None), 137 ('qcheck', 0.0), # timeout in seconds (float) to check for events 138 ('calendar', None), 139 ) 140 141 (CONNECTED, DISCONNECTED, CONNBROKEN, DELAYED, 142 LIVE, NOTSUBSCRIBED, NOTSUPPORTED_TF, UNKNOWN) = range(8) 143 144 _NOTIFNAMES = [ 145 'CONNECTED', 'DISCONNECTED', 'CONNBROKEN', 'DELAYED', 146 'LIVE', 'NOTSUBSCRIBED', 'NOTSUPPORTED_TIMEFRAME', 'UNKNOWN'] 147 148 @classmethod 149 def _getstatusname(cls, status): 150 return cls._NOTIFNAMES[status] 151 152 _compensate = None 153 _feed = None 154 _store = None 155 156 _clone = False 157 _qcheck = 0.0 158 159 _tmoffset = datetime.timedelta() 160 161 # Set to non 0 if resampling/replaying 162 resampling = 0 163 replaying = 0 164 165 _started = False 166 167 def _start_finish(self): 168 # A live feed (for example) may have learnt something about the 169 # timezones after the start and that's why the date/time related 170 # parameters are converted at this late stage 171 # Get the output timezone (if any) 172 self._tz = self._gettz() 173 # Lines have already been create, set the tz 174 self.lines.datetime._settz(self._tz) 175 176 # This should probably be also called from an override-able method 177 self._tzinput = bt.utils.date.Localizer(self._gettzinput()) 178 179 # Convert user input times to the output timezone (or min/max) 180 if self.p.fromdate is None: 181 self.fromdate = float('-inf') 182 else: 183 self.fromdate = self.date2num(self.p.fromdate) 184 185 if self.p.todate is None: 186 self.todate = float('inf') 187 else: 188 self.todate = self.date2num(self.p.todate) 189 190 # FIXME: These two are never used and could be removed 191 self.sessionstart = time2num(self.p.sessionstart) 192 self.sessionend = time2num(self.p.sessionend) 193 194 self._calendar = cal = self.p.calendar 195 if cal is None: 196 self._calendar = self._env._tradingcal 197 elif isinstance(cal, string_types): 198 self._calendar = PandasMarketCalendar(calendar=cal) 199 200 self._started = True 201 202 def _start(self): 203 self.start() 204 205 if not self._started: 206 self._start_finish() 207 208 def _timeoffset(self): 209 return self._tmoffset 210 211 def _getnexteos(self): 212 '''Returns the next eos using a trading calendar if available''' 213 if self._clone: 214 return self.data._getnexteos() 215 216 if not len(self): 217 return datetime.datetime.min, 0.0 218 219 dt = self.lines.datetime[0] 220 dtime = num2date(dt) 221 if self._calendar is None: 222 nexteos = datetime.datetime.combine(dtime, self.p.sessionend) 223 nextdteos = self.date2num(nexteos) # locl'ed -> utc-like 224 nexteos = num2date(nextdteos) # utc 225 while dtime > nexteos: 226 nexteos += datetime.timedelta(days=1) # already utc-like 227 228 nextdteos = date2num(nexteos) # -> utc-like 229 230 else: 231 # returns times in utc 232 _, nexteos = self._calendar.schedule(dtime, self._tz) 233 nextdteos = date2num(nexteos) # nextos is already utc 234 235 return nexteos, nextdteos 236 237 def _gettzinput(self): 238 '''Can be overriden by classes to return a timezone for input''' 239 return tzparse(self.p.tzinput) 240 241 def _gettz(self): 242 '''To be overriden by subclasses which may auto-calculate the 243 timezone''' 244 return tzparse(self.p.tz) 245 246 def date2num(self, dt): 247 if self._tz is not None: 248 return date2num(self._tz.localize(dt)) 249 250 return date2num(dt) 251 252 def num2date(self, dt=None, tz=None, naive=True): 253 if dt is None: 254 return num2date(self.lines.datetime[0], tz or self._tz, naive) 255 256 return num2date(dt, tz or self._tz, naive) 257 258 def haslivedata(self): 259 return False # must be overriden for those that can 260 261 def do_qcheck(self, onoff, qlapse): 262 # if onoff is True the data will wait p.qcheck for incoming live data 263 # on its queue. 264 qwait = self.p.qcheck if onoff else 0.0 265 qwait = max(0.0, qwait - qlapse) 266 self._qcheck = qwait 267 268 def islive(self): 269 '''If this returns True, ``Cerebro`` will deactivate ``preload`` and 270 ``runonce`` because a live data source must be fetched tick by tick (or 271 bar by bar)''' 272 return False 273 274 def put_notification(self, status, *args, **kwargs): 275 '''Add arguments to notification queue''' 276 if self._laststatus != status: 277 self.notifs.append((status, args, kwargs)) 278 self._laststatus = status 279 280 def get_notifications(self): 281 '''Return the pending "store" notifications''' 282 # The background thread could keep on adding notifications. The None 283 # mark allows to identify which is the last notification to deliver 284 self.notifs.append(None) # put a mark 285 notifs = list() 286 while True: 287 notif = self.notifs.popleft() 288 if notif is None: # mark is reached 289 break 290 notifs.append(notif) 291 292 return notifs 293 294 def getfeed(self): 295 return self._feed 296 297 def qbuffer(self, savemem=0, replaying=False): 298 extrasize = self.resampling or replaying 299 for line in self.lines: 300 line.qbuffer(savemem=savemem, extrasize=extrasize) 301 302 def start(self): 303 self._barstack = collections.deque() 304 self._barstash = collections.deque() 305 self._laststatus = self.CONNECTED 306 307 def stop(self): 308 pass 309 310 def clone(self, **kwargs): 311 return DataClone(dataname=self, **kwargs) 312 313 def copyas(self, _dataname, **kwargs): 314 d = DataClone(dataname=self, **kwargs) 315 d._dataname = _dataname 316 d._name = _dataname 317 return d 318 319 def setenvironment(self, env): 320 '''Keep a reference to the environment''' 321 self._env = env 322 323 def getenvironment(self): 324 return self._env 325 326 def addfilter_simple(self, f, *args, **kwargs): 327 fp = SimpleFilterWrapper(self, f, *args, **kwargs) 328 self._filters.append((fp, fp.args, fp.kwargs)) 329 330 def addfilter(self, p, *args, **kwargs): 331 if inspect.isclass(p): 332 pobj = p(self, *args, **kwargs) 333 self._filters.append((pobj, [], {})) 334 335 if hasattr(pobj, 'last'): 336 self._ffilters.append((pobj, [], {})) 337 338 else: 339 self._filters.append((p, args, kwargs)) 340 341 def compensate(self, other): 342 '''Call it to let the broker know that actions on this asset will 343 compensate open positions in another''' 344 345 self._compensate = other 346 347 def _tick_nullify(self): 348 # These are the updating prices in case the new bar is "updated" 349 # and the length doesn't change like if a replay is happening or 350 # a real-time data feed is in use and 1 minutes bars are being 351 # constructed with 5 seconds updates 352 for lalias in self.getlinealiases(): 353 if lalias != 'datetime': 354 setattr(self, 'tick_' + lalias, None) 355 356 self.tick_last = None 357 358 def _tick_fill(self, force=False): 359 # If nothing filled the tick_xxx attributes, the bar is the tick 360 alias0 = self._getlinealias(0) 361 if force or getattr(self, 'tick_' + alias0, None) is None: 362 for lalias in self.getlinealiases(): 363 if lalias != 'datetime': 364 setattr(self, 'tick_' + lalias, 365 getattr(self.lines, lalias)[0]) 366 367 self.tick_last = getattr(self.lines, alias0)[0] 368 369 def advance_peek(self): 370 if len(self) < self.buflen(): 371 return self.lines.datetime[1] # return the future 372 373 return float('inf') # max date else 374 375 def advance(self, size=1, datamaster=None, ticks=True): 376 if ticks: 377 self._tick_nullify() 378 379 # Need intercepting this call to support datas with 380 # different lengths (timeframes) 381 self.lines.advance(size) 382 383 if datamaster is not None: 384 if len(self) > self.buflen(): 385 # if no bar can be delivered, fill with an empty bar 386 self.rewind() 387 self.lines.forward() 388 return 389 390 if self.lines.datetime[0] > datamaster.lines.datetime[0]: 391 self.lines.rewind() 392 else: 393 if ticks: 394 self._tick_fill() 395 elif len(self) < self.buflen(): 396 # a resampler may have advance us past the last point 397 if ticks: 398 self._tick_fill() 399 400 def next(self, datamaster=None, ticks=True): 401 402 if len(self) >= self.buflen(): 403 if ticks: 404 self._tick_nullify() 405 406 # not preloaded - request next bar 407 ret = self.load() 408 if not ret: 409 # if load cannot produce bars - forward the result 410 return ret 411 412 if datamaster is None: 413 # bar is there and no master ... return load's result 414 if ticks: 415 self._tick_fill() 416 return ret 417 else: 418 self.advance(ticks=ticks) 419 420 # a bar is "loaded" or was preloaded - index has been moved to it 421 if datamaster is not None: 422 # there is a time reference to check against 423 if self.lines.datetime[0] > datamaster.lines.datetime[0]: 424 # can't deliver new bar, too early, go back 425 self.rewind() 426 else: 427 if ticks: 428 self._tick_fill() 429 430 else: 431 if ticks: 432 self._tick_fill() 433 434 # tell the world there is a bar (either the new or the previous 435 return True 436 437 def preload(self): 438 while self.load(): 439 pass 440 441 self._last() 442 self.home() 443 444 def _last(self, datamaster=None): 445 # Last chance for filters to deliver something 446 ret = 0 447 for ff, fargs, fkwargs in self._ffilters: 448 ret += ff.last(self, *fargs, **fkwargs) 449 450 doticks = False 451 if datamaster is not None and self._barstack: 452 doticks = True 453 454 while self._fromstack(forward=True): 455 # consume bar(s) produced by "last"s - adding room 456 pass 457 458 if doticks: 459 self._tick_fill() 460 461 return bool(ret) 462 463 def _check(self, forcedata=None): 464 ret = 0 465 for ff, fargs, fkwargs in self._filters: 466 if not hasattr(ff, 'check'): 467 continue 468 ff.check(self, _forcedata=forcedata, *fargs, **fkwargs) 469 470 def load(self): 471 while True: 472 # move data pointer forward for new bar 473 self.forward() 474 475 if self._fromstack(): # bar is available 476 return True 477 478 if not self._fromstack(stash=True): 479 _loadret = self._load() 480 if not _loadret: # no bar use force to make sure in exactbars 481 # the pointer is undone this covers especially (but not 482 # uniquely) the case in which the last bar has been seen 483 # and a backwards would ruin pointer accounting in the 484 # "stop" method of the strategy 485 self.backwards(force=True) # undo data pointer 486 487 # return the actual returned value which may be None to 488 # signal no bar is available, but the data feed is not 489 # done. False means game over 490 return _loadret 491 492 # Get a reference to current loaded time 493 dt = self.lines.datetime[0] 494 495 # A bar has been loaded, adapt the time 496 if self._tzinput: 497 # Input has been converted at face value but it's not UTC in 498 # the input stream 499 dtime = num2date(dt) # get it in a naive datetime 500 # localize it 501 dtime = self._tzinput.localize(dtime) # pytz compatible-ized 502 self.lines.datetime[0] = dt = date2num(dtime) # keep UTC val 503 504 # Check standard date from/to filters 505 if dt < self.fromdate: 506 # discard loaded bar and carry on 507 self.backwards() 508 continue 509 if dt > self.todate: 510 # discard loaded bar and break out 511 self.backwards(force=True) 512 break 513 514 # Pass through filters 515 retff = False 516 for ff, fargs, fkwargs in self._filters: 517 # previous filter may have put things onto the stack 518 if self._barstack: 519 for i in range(len(self._barstack)): 520 self._fromstack(forward=True) 521 retff = ff(self, *fargs, **fkwargs) 522 else: 523 retff = ff(self, *fargs, **fkwargs) 524 525 if retff: # bar removed from systemn 526 break # out of the inner loop 527 528 if retff: # bar removed from system - loop to get new bar 529 continue # in the greater loop 530 531 # Checks let the bar through ... notify it 532 return True 533 534 # Out of the loop ... no more bars or past todate 535 return False 536 537 def _load(self): 538 return False 539 540 def _add2stack(self, bar, stash=False): 541 '''Saves given bar (list of values) to the stack for later retrieval''' 542 if not stash: 543 self._barstack.append(bar) 544 else: 545 self._barstash.append(bar) 546 547 def _save2stack(self, erase=False, force=False, stash=False): 548 '''Saves current bar to the bar stack for later retrieval 549 550 Parameter ``erase`` determines removal from the data stream 551 ''' 552 bar = [line[0] for line in self.itersize()] 553 if not stash: 554 self._barstack.append(bar) 555 else: 556 self._barstash.append(bar) 557 558 if erase: # remove bar if requested 559 self.backwards(force=force) 560 561 def _updatebar(self, bar, forward=False, ago=0): 562 '''Load a value from the stack onto the lines to form the new bar 563 564 Returns True if values are present, False otherwise 565 ''' 566 if forward: 567 self.forward() 568 569 for line, val in zip(self.itersize(), bar): 570 line[0 + ago] = val 571 572 def _fromstack(self, forward=False, stash=False): 573 '''Load a value from the stack onto the lines to form the new bar 574 575 Returns True if values are present, False otherwise 576 ''' 577 578 coll = self._barstack if not stash else self._barstash 579 580 if coll: 581 if forward: 582 self.forward() 583 584 for line, val in zip(self.itersize(), coll.popleft()): 585 line[0] = val 586 587 return True 588 589 return False 590 591 def resample(self, **kwargs): 592 self.addfilter(Resampler, **kwargs) 593 594 def replay(self, **kwargs): 595 self.addfilter(Replayer, **kwargs) 596 597 598class DataBase(AbstractDataBase): 599 pass 600 601 602class FeedBase(with_metaclass(metabase.MetaParams, object)): 603 params = () + DataBase.params._gettuple() 604 605 def __init__(self): 606 self.datas = list() 607 608 def start(self): 609 for data in self.datas: 610 data.start() 611 612 def stop(self): 613 for data in self.datas: 614 data.stop() 615 616 def getdata(self, dataname, name=None, **kwargs): 617 for pname, pvalue in self.p._getitems(): 618 kwargs.setdefault(pname, getattr(self.p, pname)) 619 620 kwargs['dataname'] = dataname 621 data = self._getdata(**kwargs) 622 623 data._name = name 624 625 self.datas.append(data) 626 return data 627 628 def _getdata(self, dataname, **kwargs): 629 for pname, pvalue in self.p._getitems(): 630 kwargs.setdefault(pname, getattr(self.p, pname)) 631 632 kwargs['dataname'] = dataname 633 return self.DataCls(**kwargs) 634 635 636class MetaCSVDataBase(DataBase.__class__): 637 def dopostinit(cls, _obj, *args, **kwargs): 638 # Before going to the base class to make sure it overrides the default 639 if not _obj.p.name and not _obj._name: 640 _obj._name, _ = os.path.splitext(os.path.basename(_obj.p.dataname)) 641 642 _obj, args, kwargs = \ 643 super(MetaCSVDataBase, cls).dopostinit(_obj, *args, **kwargs) 644 645 return _obj, args, kwargs 646 647 648class CSVDataBase(with_metaclass(MetaCSVDataBase, DataBase)): 649 ''' 650 Base class for classes implementing CSV DataFeeds 651 652 The class takes care of opening the file, reading the lines and 653 tokenizing them. 654 655 Subclasses do only need to override: 656 657 - _loadline(tokens) 658 659 The return value of ``_loadline`` (True/False) will be the return value 660 of ``_load`` which has been overriden by this base class 661 ''' 662 663 f = None 664 params = (('headers', True), ('separator', ','),) 665 666 def start(self): 667 super(CSVDataBase, self).start() 668 669 if self.f is None: 670 if hasattr(self.p.dataname, 'readline'): 671 self.f = self.p.dataname 672 else: 673 # Let an exception propagate to let the caller know 674 self.f = io.open(self.p.dataname, 'r') 675 676 if self.p.headers: 677 self.f.readline() # skip the headers 678 679 self.separator = self.p.separator 680 681 def stop(self): 682 super(CSVDataBase, self).stop() 683 if self.f is not None: 684 self.f.close() 685 self.f = None 686 687 def preload(self): 688 while self.load(): 689 pass 690 691 self._last() 692 self.home() 693 694 # preloaded - no need to keep the object around - breaks multip in 3.x 695 self.f.close() 696 self.f = None 697 698 def _load(self): 699 if self.f is None: 700 return False 701 702 # Let an exception propagate to let the caller know 703 line = self.f.readline() 704 705 if not line: 706 return False 707 708 line = line.rstrip('\n') 709 linetokens = line.split(self.separator) 710 return self._loadline(linetokens) 711 712 def _getnextline(self): 713 if self.f is None: 714 return None 715 716 # Let an exception propagate to let the caller know 717 line = self.f.readline() 718 719 if not line: 720 return None 721 722 line = line.rstrip('\n') 723 linetokens = line.split(self.separator) 724 return linetokens 725 726 727class CSVFeedBase(FeedBase): 728 params = (('basepath', ''),) + CSVDataBase.params._gettuple() 729 730 def _getdata(self, dataname, **kwargs): 731 return self.DataCls(dataname=self.p.basepath + dataname, 732 **self.p._getkwargs()) 733 734 735class DataClone(AbstractDataBase): 736 _clone = True 737 738 def __init__(self): 739 self.data = self.p.dataname 740 self._dataname = self.data._dataname 741 742 # Copy date/session parameters 743 self.p.fromdate = self.p.fromdate 744 self.p.todate = self.p.todate 745 self.p.sessionstart = self.data.p.sessionstart 746 self.p.sessionend = self.data.p.sessionend 747 748 self.p.timeframe = self.data.p.timeframe 749 self.p.compression = self.data.p.compression 750 751 def _start(self): 752 # redefine to copy data bits from guest data 753 self.start() 754 755 # Copy tz infos 756 self._tz = self.data._tz 757 self.lines.datetime._settz(self._tz) 758 759 self._calendar = self.data._calendar 760 761 # input has already been converted by guest data 762 self._tzinput = None # no need to further converr 763 764 # Copy dates/session infos 765 self.fromdate = self.data.fromdate 766 self.todate = self.data.todate 767 768 # FIXME: if removed from guest, remove here too 769 self.sessionstart = self.data.sessionstart 770 self.sessionend = self.data.sessionend 771 772 def start(self): 773 super(DataClone, self).start() 774 self._dlen = 0 775 self._preloading = False 776 777 def preload(self): 778 self._preloading = True 779 super(DataClone, self).preload() 780 self.data.home() # preloading data was pushed forward 781 self._preloading = False 782 783 def _load(self): 784 # assumption: the data is in the system 785 # simply copy the lines 786 if self._preloading: 787 # data is preloaded, we are preloading too, can move 788 # forward until have full bar or data source is exhausted 789 self.data.advance() 790 if len(self.data) > self.data.buflen(): 791 return False 792 793 for line, dline in zip(self.lines, self.data.lines): 794 line[0] = dline[0] 795 796 return True 797 798 # Not preloading 799 if not (len(self.data) > self._dlen): 800 # Data not beyond last seen bar 801 return False 802 803 self._dlen += 1 804 805 for line, dline in zip(self.lines, self.data.lines): 806 line[0] = dline[0] 807 808 return True 809 810 def advance(self, size=1, datamaster=None, ticks=True): 811 self._dlen += size 812 super(DataClone, self).advance(size, datamaster, ticks=ticks) 813