1#!/usr/bin/env python
2# -*- coding: utf-8; py-indent-offset:4 -*-
3###############################################################################
4#
5# Copyright (C) 2015, 2016, 2017 Daniel Rodriguez
6#
7# This program is free software: you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation, either version 3 of the License, or
10# (at your option) any later version.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program.  If not, see <http://www.gnu.org/licenses/>.
19#
20###############################################################################
21from __future__ import (absolute_import, division, print_function,
22                        unicode_literals)
23
24import collections
25import datetime
26import inspect
27import io
28import os.path
29
30import backtrader as bt
31from backtrader import (date2num, num2date, time2num, TimeFrame, dataseries,
32                        metabase)
33
34from backtrader.utils.py3 import with_metaclass, zip, range, string_types
35from backtrader.utils import tzparse
36from .dataseries import SimpleFilterWrapper
37from .resamplerfilter import Resampler, Replayer
38from .tradingcal import PandasMarketCalendar
39
40
41class MetaAbstractDataBase(dataseries.OHLCDateTime.__class__):
42    _indcol = dict()
43
44    def __init__(cls, name, bases, dct):
45        '''
46        Class has already been created ... register subclasses
47        '''
48        # Initialize the class
49        super(MetaAbstractDataBase, cls).__init__(name, bases, dct)
50
51        if not cls.aliased and \
52           name != 'DataBase' and not name.startswith('_'):
53            cls._indcol[name] = cls
54
55    def dopreinit(cls, _obj, *args, **kwargs):
56        _obj, args, kwargs = \
57            super(MetaAbstractDataBase, cls).dopreinit(_obj, *args, **kwargs)
58
59        # Find the owner and store it
60        _obj._feed = metabase.findowner(_obj, FeedBase)
61
62        _obj.notifs = collections.deque()  # store notifications for cerebro
63
64        _obj._dataname = _obj.p.dataname
65        _obj._name = ''
66        return _obj, args, kwargs
67
68    def dopostinit(cls, _obj, *args, **kwargs):
69        _obj, args, kwargs = \
70            super(MetaAbstractDataBase, cls).dopostinit(_obj, *args, **kwargs)
71
72        # Either set by subclass or the parameter or use the dataname (ticker)
73        _obj._name = _obj._name or _obj.p.name
74        if not _obj._name and isinstance(_obj.p.dataname, string_types):
75            _obj._name = _obj.p.dataname
76        _obj._compression = _obj.p.compression
77        _obj._timeframe = _obj.p.timeframe
78
79        if isinstance(_obj.p.sessionstart, datetime.datetime):
80            _obj.p.sessionstart = _obj.p.sessionstart.time()
81
82        elif _obj.p.sessionstart is None:
83            _obj.p.sessionstart = datetime.time.min
84
85        if isinstance(_obj.p.sessionend, datetime.datetime):
86            _obj.p.sessionend = _obj.p.sessionend.time()
87
88        elif _obj.p.sessionend is None:
89            # remove 9 to avoid precision rounding errors
90            _obj.p.sessionend = datetime.time(23, 59, 59, 999990)
91
92        if isinstance(_obj.p.fromdate, datetime.date):
93            # push it to the end of the day, or else intraday
94            # values before the end of the day would be gone
95            if not hasattr(_obj.p.fromdate, 'hour'):
96                _obj.p.fromdate = datetime.datetime.combine(
97                    _obj.p.fromdate, _obj.p.sessionstart)
98
99        if isinstance(_obj.p.todate, datetime.date):
100            # push it to the end of the day, or else intraday
101            # values before the end of the day would be gone
102            if not hasattr(_obj.p.todate, 'hour'):
103                _obj.p.todate = datetime.datetime.combine(
104                    _obj.p.todate, _obj.p.sessionend)
105
106        _obj._barstack = collections.deque()  # for filter operations
107        _obj._barstash = collections.deque()  # for filter operations
108
109        _obj._filters = list()
110        _obj._ffilters = list()
111        for fp in _obj.p.filters:
112            if inspect.isclass(fp):
113                fp = fp(_obj)
114                if hasattr(fp, 'last'):
115                    _obj._ffilters.append((fp, [], {}))
116
117            _obj._filters.append((fp, [], {}))
118
119        return _obj, args, kwargs
120
121
122class AbstractDataBase(with_metaclass(MetaAbstractDataBase,
123                                      dataseries.OHLCDateTime)):
124
125    params = (
126        ('dataname', None),
127        ('name', ''),
128        ('compression', 1),
129        ('timeframe', TimeFrame.Days),
130        ('fromdate', None),
131        ('todate', None),
132        ('sessionstart', None),
133        ('sessionend', None),
134        ('filters', []),
135        ('tz', None),
136        ('tzinput', None),
137        ('qcheck', 0.0),  # timeout in seconds (float) to check for events
138        ('calendar', None),
139    )
140
141    (CONNECTED, DISCONNECTED, CONNBROKEN, DELAYED,
142     LIVE, NOTSUBSCRIBED, NOTSUPPORTED_TF, UNKNOWN) = range(8)
143
144    _NOTIFNAMES = [
145        'CONNECTED', 'DISCONNECTED', 'CONNBROKEN', 'DELAYED',
146        'LIVE', 'NOTSUBSCRIBED', 'NOTSUPPORTED_TIMEFRAME', 'UNKNOWN']
147
148    @classmethod
149    def _getstatusname(cls, status):
150        return cls._NOTIFNAMES[status]
151
152    _compensate = None
153    _feed = None
154    _store = None
155
156    _clone = False
157    _qcheck = 0.0
158
159    _tmoffset = datetime.timedelta()
160
161    # Set to non 0 if resampling/replaying
162    resampling = 0
163    replaying = 0
164
165    _started = False
166
167    def _start_finish(self):
168        # A live feed (for example) may have learnt something about the
169        # timezones after the start and that's why the date/time related
170        # parameters are converted at this late stage
171        # Get the output timezone (if any)
172        self._tz = self._gettz()
173        # Lines have already been create, set the tz
174        self.lines.datetime._settz(self._tz)
175
176        # This should probably be also called from an override-able method
177        self._tzinput = bt.utils.date.Localizer(self._gettzinput())
178
179        # Convert user input times to the output timezone (or min/max)
180        if self.p.fromdate is None:
181            self.fromdate = float('-inf')
182        else:
183            self.fromdate = self.date2num(self.p.fromdate)
184
185        if self.p.todate is None:
186            self.todate = float('inf')
187        else:
188            self.todate = self.date2num(self.p.todate)
189
190        # FIXME: These two are never used and could be removed
191        self.sessionstart = time2num(self.p.sessionstart)
192        self.sessionend = time2num(self.p.sessionend)
193
194        self._calendar = cal = self.p.calendar
195        if cal is None:
196            self._calendar = self._env._tradingcal
197        elif isinstance(cal, string_types):
198            self._calendar = PandasMarketCalendar(calendar=cal)
199
200        self._started = True
201
202    def _start(self):
203        self.start()
204
205        if not self._started:
206            self._start_finish()
207
208    def _timeoffset(self):
209        return self._tmoffset
210
211    def _getnexteos(self):
212        '''Returns the next eos using a trading calendar if available'''
213        if self._clone:
214            return self.data._getnexteos()
215
216        if not len(self):
217            return datetime.datetime.min, 0.0
218
219        dt = self.lines.datetime[0]
220        dtime = num2date(dt)
221        if self._calendar is None:
222            nexteos = datetime.datetime.combine(dtime, self.p.sessionend)
223            nextdteos = self.date2num(nexteos)  # locl'ed -> utc-like
224            nexteos = num2date(nextdteos)  # utc
225            while dtime > nexteos:
226                nexteos += datetime.timedelta(days=1)  # already utc-like
227
228            nextdteos = date2num(nexteos)  # -> utc-like
229
230        else:
231            # returns times in utc
232            _, nexteos = self._calendar.schedule(dtime, self._tz)
233            nextdteos = date2num(nexteos)  # nextos is already utc
234
235        return nexteos, nextdteos
236
237    def _gettzinput(self):
238        '''Can be overriden by classes to return a timezone for input'''
239        return tzparse(self.p.tzinput)
240
241    def _gettz(self):
242        '''To be overriden by subclasses which may auto-calculate the
243        timezone'''
244        return tzparse(self.p.tz)
245
246    def date2num(self, dt):
247        if self._tz is not None:
248            return date2num(self._tz.localize(dt))
249
250        return date2num(dt)
251
252    def num2date(self, dt=None, tz=None, naive=True):
253        if dt is None:
254            return num2date(self.lines.datetime[0], tz or self._tz, naive)
255
256        return num2date(dt, tz or self._tz, naive)
257
258    def haslivedata(self):
259        return False  # must be overriden for those that can
260
261    def do_qcheck(self, onoff, qlapse):
262        # if onoff is True the data will wait p.qcheck for incoming live data
263        # on its queue.
264        qwait = self.p.qcheck if onoff else 0.0
265        qwait = max(0.0, qwait - qlapse)
266        self._qcheck = qwait
267
268    def islive(self):
269        '''If this returns True, ``Cerebro`` will deactivate ``preload`` and
270        ``runonce`` because a live data source must be fetched tick by tick (or
271        bar by bar)'''
272        return False
273
274    def put_notification(self, status, *args, **kwargs):
275        '''Add arguments to notification queue'''
276        if self._laststatus != status:
277            self.notifs.append((status, args, kwargs))
278            self._laststatus = status
279
280    def get_notifications(self):
281        '''Return the pending "store" notifications'''
282        # The background thread could keep on adding notifications. The None
283        # mark allows to identify which is the last notification to deliver
284        self.notifs.append(None)  # put a mark
285        notifs = list()
286        while True:
287            notif = self.notifs.popleft()
288            if notif is None:  # mark is reached
289                break
290            notifs.append(notif)
291
292        return notifs
293
294    def getfeed(self):
295        return self._feed
296
297    def qbuffer(self, savemem=0, replaying=False):
298        extrasize = self.resampling or replaying
299        for line in self.lines:
300            line.qbuffer(savemem=savemem, extrasize=extrasize)
301
302    def start(self):
303        self._barstack = collections.deque()
304        self._barstash = collections.deque()
305        self._laststatus = self.CONNECTED
306
307    def stop(self):
308        pass
309
310    def clone(self, **kwargs):
311        return DataClone(dataname=self, **kwargs)
312
313    def copyas(self, _dataname, **kwargs):
314        d = DataClone(dataname=self, **kwargs)
315        d._dataname = _dataname
316        d._name = _dataname
317        return d
318
319    def setenvironment(self, env):
320        '''Keep a reference to the environment'''
321        self._env = env
322
323    def getenvironment(self):
324        return self._env
325
326    def addfilter_simple(self, f, *args, **kwargs):
327        fp = SimpleFilterWrapper(self, f, *args, **kwargs)
328        self._filters.append((fp, fp.args, fp.kwargs))
329
330    def addfilter(self, p, *args, **kwargs):
331        if inspect.isclass(p):
332            pobj = p(self, *args, **kwargs)
333            self._filters.append((pobj, [], {}))
334
335            if hasattr(pobj, 'last'):
336                self._ffilters.append((pobj, [], {}))
337
338        else:
339            self._filters.append((p, args, kwargs))
340
341    def compensate(self, other):
342        '''Call it to let the broker know that actions on this asset will
343        compensate open positions in another'''
344
345        self._compensate = other
346
347    def _tick_nullify(self):
348        # These are the updating prices in case the new bar is "updated"
349        # and the length doesn't change like if a replay is happening or
350        # a real-time data feed is in use and 1 minutes bars are being
351        # constructed with 5 seconds updates
352        for lalias in self.getlinealiases():
353            if lalias != 'datetime':
354                setattr(self, 'tick_' + lalias, None)
355
356        self.tick_last = None
357
358    def _tick_fill(self, force=False):
359        # If nothing filled the tick_xxx attributes, the bar is the tick
360        alias0 = self._getlinealias(0)
361        if force or getattr(self, 'tick_' + alias0, None) is None:
362            for lalias in self.getlinealiases():
363                if lalias != 'datetime':
364                    setattr(self, 'tick_' + lalias,
365                            getattr(self.lines, lalias)[0])
366
367            self.tick_last = getattr(self.lines, alias0)[0]
368
369    def advance_peek(self):
370        if len(self) < self.buflen():
371            return self.lines.datetime[1]  # return the future
372
373        return float('inf')  # max date else
374
375    def advance(self, size=1, datamaster=None, ticks=True):
376        if ticks:
377            self._tick_nullify()
378
379        # Need intercepting this call to support datas with
380        # different lengths (timeframes)
381        self.lines.advance(size)
382
383        if datamaster is not None:
384            if len(self) > self.buflen():
385                # if no bar can be delivered, fill with an empty bar
386                self.rewind()
387                self.lines.forward()
388                return
389
390            if self.lines.datetime[0] > datamaster.lines.datetime[0]:
391                self.lines.rewind()
392            else:
393                if ticks:
394                    self._tick_fill()
395        elif len(self) < self.buflen():
396            # a resampler may have advance us past the last point
397            if ticks:
398                self._tick_fill()
399
400    def next(self, datamaster=None, ticks=True):
401
402        if len(self) >= self.buflen():
403            if ticks:
404                self._tick_nullify()
405
406            # not preloaded - request next bar
407            ret = self.load()
408            if not ret:
409                # if load cannot produce bars - forward the result
410                return ret
411
412            if datamaster is None:
413                # bar is there and no master ... return load's result
414                if ticks:
415                    self._tick_fill()
416                return ret
417        else:
418            self.advance(ticks=ticks)
419
420        # a bar is "loaded" or was preloaded - index has been moved to it
421        if datamaster is not None:
422            # there is a time reference to check against
423            if self.lines.datetime[0] > datamaster.lines.datetime[0]:
424                # can't deliver new bar, too early, go back
425                self.rewind()
426            else:
427                if ticks:
428                    self._tick_fill()
429
430        else:
431            if ticks:
432                self._tick_fill()
433
434        # tell the world there is a bar (either the new or the previous
435        return True
436
437    def preload(self):
438        while self.load():
439            pass
440
441        self._last()
442        self.home()
443
444    def _last(self, datamaster=None):
445        # Last chance for filters to deliver something
446        ret = 0
447        for ff, fargs, fkwargs in self._ffilters:
448            ret += ff.last(self, *fargs, **fkwargs)
449
450        doticks = False
451        if datamaster is not None and self._barstack:
452            doticks = True
453
454        while self._fromstack(forward=True):
455            # consume bar(s) produced by "last"s - adding room
456            pass
457
458        if doticks:
459            self._tick_fill()
460
461        return bool(ret)
462
463    def _check(self, forcedata=None):
464        ret = 0
465        for ff, fargs, fkwargs in self._filters:
466            if not hasattr(ff, 'check'):
467                continue
468            ff.check(self, _forcedata=forcedata, *fargs, **fkwargs)
469
470    def load(self):
471        while True:
472            # move data pointer forward for new bar
473            self.forward()
474
475            if self._fromstack():  # bar is available
476                return True
477
478            if not self._fromstack(stash=True):
479                _loadret = self._load()
480                if not _loadret:  # no bar use force to make sure in exactbars
481                    # the pointer is undone this covers especially (but not
482                    # uniquely) the case in which the last bar has been seen
483                    # and a backwards would ruin pointer accounting in the
484                    # "stop" method of the strategy
485                    self.backwards(force=True)  # undo data pointer
486
487                    # return the actual returned value which may be None to
488                    # signal no bar is available, but the data feed is not
489                    # done. False means game over
490                    return _loadret
491
492            # Get a reference to current loaded time
493            dt = self.lines.datetime[0]
494
495            # A bar has been loaded, adapt the time
496            if self._tzinput:
497                # Input has been converted at face value but it's not UTC in
498                # the input stream
499                dtime = num2date(dt)  # get it in a naive datetime
500                # localize it
501                dtime = self._tzinput.localize(dtime)  # pytz compatible-ized
502                self.lines.datetime[0] = dt = date2num(dtime)  # keep UTC val
503
504            # Check standard date from/to filters
505            if dt < self.fromdate:
506                # discard loaded bar and carry on
507                self.backwards()
508                continue
509            if dt > self.todate:
510                # discard loaded bar and break out
511                self.backwards(force=True)
512                break
513
514            # Pass through filters
515            retff = False
516            for ff, fargs, fkwargs in self._filters:
517                # previous filter may have put things onto the stack
518                if self._barstack:
519                    for i in range(len(self._barstack)):
520                        self._fromstack(forward=True)
521                        retff = ff(self, *fargs, **fkwargs)
522                else:
523                    retff = ff(self, *fargs, **fkwargs)
524
525                if retff:  # bar removed from systemn
526                    break  # out of the inner loop
527
528            if retff:  # bar removed from system - loop to get new bar
529                continue  # in the greater loop
530
531            # Checks let the bar through ... notify it
532            return True
533
534        # Out of the loop ... no more bars or past todate
535        return False
536
537    def _load(self):
538        return False
539
540    def _add2stack(self, bar, stash=False):
541        '''Saves given bar (list of values) to the stack for later retrieval'''
542        if not stash:
543            self._barstack.append(bar)
544        else:
545            self._barstash.append(bar)
546
547    def _save2stack(self, erase=False, force=False, stash=False):
548        '''Saves current bar to the bar stack for later retrieval
549
550        Parameter ``erase`` determines removal from the data stream
551        '''
552        bar = [line[0] for line in self.itersize()]
553        if not stash:
554            self._barstack.append(bar)
555        else:
556            self._barstash.append(bar)
557
558        if erase:  # remove bar if requested
559            self.backwards(force=force)
560
561    def _updatebar(self, bar, forward=False, ago=0):
562        '''Load a value from the stack onto the lines to form the new bar
563
564        Returns True if values are present, False otherwise
565        '''
566        if forward:
567            self.forward()
568
569        for line, val in zip(self.itersize(), bar):
570            line[0 + ago] = val
571
572    def _fromstack(self, forward=False, stash=False):
573        '''Load a value from the stack onto the lines to form the new bar
574
575        Returns True if values are present, False otherwise
576        '''
577
578        coll = self._barstack if not stash else self._barstash
579
580        if coll:
581            if forward:
582                self.forward()
583
584            for line, val in zip(self.itersize(), coll.popleft()):
585                line[0] = val
586
587            return True
588
589        return False
590
591    def resample(self, **kwargs):
592        self.addfilter(Resampler, **kwargs)
593
594    def replay(self, **kwargs):
595        self.addfilter(Replayer, **kwargs)
596
597
598class DataBase(AbstractDataBase):
599    pass
600
601
602class FeedBase(with_metaclass(metabase.MetaParams, object)):
603    params = () + DataBase.params._gettuple()
604
605    def __init__(self):
606        self.datas = list()
607
608    def start(self):
609        for data in self.datas:
610            data.start()
611
612    def stop(self):
613        for data in self.datas:
614            data.stop()
615
616    def getdata(self, dataname, name=None, **kwargs):
617        for pname, pvalue in self.p._getitems():
618            kwargs.setdefault(pname, getattr(self.p, pname))
619
620        kwargs['dataname'] = dataname
621        data = self._getdata(**kwargs)
622
623        data._name = name
624
625        self.datas.append(data)
626        return data
627
628    def _getdata(self, dataname, **kwargs):
629        for pname, pvalue in self.p._getitems():
630            kwargs.setdefault(pname, getattr(self.p, pname))
631
632        kwargs['dataname'] = dataname
633        return self.DataCls(**kwargs)
634
635
636class MetaCSVDataBase(DataBase.__class__):
637    def dopostinit(cls, _obj, *args, **kwargs):
638        # Before going to the base class to make sure it overrides the default
639        if not _obj.p.name and not _obj._name:
640            _obj._name, _ = os.path.splitext(os.path.basename(_obj.p.dataname))
641
642        _obj, args, kwargs = \
643            super(MetaCSVDataBase, cls).dopostinit(_obj, *args, **kwargs)
644
645        return _obj, args, kwargs
646
647
648class CSVDataBase(with_metaclass(MetaCSVDataBase, DataBase)):
649    '''
650    Base class for classes implementing CSV DataFeeds
651
652    The class takes care of opening the file, reading the lines and
653    tokenizing them.
654
655    Subclasses do only need to override:
656
657      - _loadline(tokens)
658
659    The return value of ``_loadline`` (True/False) will be the return value
660    of ``_load`` which has been overriden by this base class
661    '''
662
663    f = None
664    params = (('headers', True), ('separator', ','),)
665
666    def start(self):
667        super(CSVDataBase, self).start()
668
669        if self.f is None:
670            if hasattr(self.p.dataname, 'readline'):
671                self.f = self.p.dataname
672            else:
673                # Let an exception propagate to let the caller know
674                self.f = io.open(self.p.dataname, 'r')
675
676        if self.p.headers:
677            self.f.readline()  # skip the headers
678
679        self.separator = self.p.separator
680
681    def stop(self):
682        super(CSVDataBase, self).stop()
683        if self.f is not None:
684            self.f.close()
685            self.f = None
686
687    def preload(self):
688        while self.load():
689            pass
690
691        self._last()
692        self.home()
693
694        # preloaded - no need to keep the object around - breaks multip in 3.x
695        self.f.close()
696        self.f = None
697
698    def _load(self):
699        if self.f is None:
700            return False
701
702        # Let an exception propagate to let the caller know
703        line = self.f.readline()
704
705        if not line:
706            return False
707
708        line = line.rstrip('\n')
709        linetokens = line.split(self.separator)
710        return self._loadline(linetokens)
711
712    def _getnextline(self):
713        if self.f is None:
714            return None
715
716        # Let an exception propagate to let the caller know
717        line = self.f.readline()
718
719        if not line:
720            return None
721
722        line = line.rstrip('\n')
723        linetokens = line.split(self.separator)
724        return linetokens
725
726
727class CSVFeedBase(FeedBase):
728    params = (('basepath', ''),) + CSVDataBase.params._gettuple()
729
730    def _getdata(self, dataname, **kwargs):
731        return self.DataCls(dataname=self.p.basepath + dataname,
732                            **self.p._getkwargs())
733
734
735class DataClone(AbstractDataBase):
736    _clone = True
737
738    def __init__(self):
739        self.data = self.p.dataname
740        self._dataname = self.data._dataname
741
742        # Copy date/session parameters
743        self.p.fromdate = self.p.fromdate
744        self.p.todate = self.p.todate
745        self.p.sessionstart = self.data.p.sessionstart
746        self.p.sessionend = self.data.p.sessionend
747
748        self.p.timeframe = self.data.p.timeframe
749        self.p.compression = self.data.p.compression
750
751    def _start(self):
752        # redefine to copy data bits from guest data
753        self.start()
754
755        # Copy tz infos
756        self._tz = self.data._tz
757        self.lines.datetime._settz(self._tz)
758
759        self._calendar = self.data._calendar
760
761        # input has already been converted by guest data
762        self._tzinput = None  # no need to further converr
763
764        # Copy dates/session infos
765        self.fromdate = self.data.fromdate
766        self.todate = self.data.todate
767
768        # FIXME: if removed from guest, remove here too
769        self.sessionstart = self.data.sessionstart
770        self.sessionend = self.data.sessionend
771
772    def start(self):
773        super(DataClone, self).start()
774        self._dlen = 0
775        self._preloading = False
776
777    def preload(self):
778        self._preloading = True
779        super(DataClone, self).preload()
780        self.data.home()  # preloading data was pushed forward
781        self._preloading = False
782
783    def _load(self):
784        # assumption: the data is in the system
785        # simply copy the lines
786        if self._preloading:
787            # data is preloaded, we are preloading too, can move
788            # forward until have full bar or data source is exhausted
789            self.data.advance()
790            if len(self.data) > self.data.buflen():
791                return False
792
793            for line, dline in zip(self.lines, self.data.lines):
794                line[0] = dline[0]
795
796            return True
797
798        # Not preloading
799        if not (len(self.data) > self._dlen):
800            # Data not beyond last seen bar
801            return False
802
803        self._dlen += 1
804
805        for line, dline in zip(self.lines, self.data.lines):
806            line[0] = dline[0]
807
808        return True
809
810    def advance(self, size=1, datamaster=None, ticks=True):
811        self._dlen += size
812        super(DataClone, self).advance(size, datamaster, ticks=ticks)
813