1from datetime import datetime 2 3from loguru import logger 4 5from flexget import plugin 6from flexget.components.backlog.db import BacklogEntry, clear_entries, get_entries 7from flexget.event import event 8from flexget.manager import Session 9from flexget.utils.database import with_session 10from flexget.utils.serialization import serialize 11from flexget.utils.tools import parse_timedelta 12 13logger = logger.bind(name='backlog') 14 15 16class InputBacklog: 17 """ 18 Keeps task history for given amount of time. 19 20 Example:: 21 22 backlog: 4 days 23 24 Rarely useful for end users, mainly used by other plugins. 25 """ 26 27 schema = {'type': 'string', 'format': 'interval'} 28 29 @plugin.priority(plugin.PRIORITY_LAST) 30 def on_task_input(self, task, config): 31 # Get a list of entries to inject 32 injections = self.get_injections(task) 33 if config: 34 # If backlog is manually enabled for this task, learn the entries. 35 self.learn_backlog(task, config) 36 # Return the entries from backlog that are not already in the task 37 return injections 38 39 @plugin.priority(plugin.PRIORITY_FIRST) 40 def on_task_metainfo(self, task, config): 41 # Take a snapshot of any new entries' states before metainfo event in case we have to store them to backlog 42 for entry in task.entries: 43 entry['_backlog_snapshot'] = serialize(entry) 44 45 def on_task_abort(self, task, config): 46 """Remember all entries until next execution when task gets aborted.""" 47 if task.entries: 48 logger.debug('Remembering all entries to backlog because of task abort.') 49 self.learn_backlog(task) 50 51 @with_session 52 def add_backlog(self, task, entry, amount='', session=None): 53 """Add single entry to task backlog 54 55 If :amount: is not specified, entry will only be injected on next execution.""" 56 snapshot = entry.get('_backlog_snapshot') 57 if not snapshot: 58 if task.current_phase != 'input': 59 # Not having a snapshot is normal during input phase, don't display a warning 60 logger.warning( 61 'No input snapshot available for `{}`, using current state', entry['title'] 62 ) 63 snapshot = serialize(entry) 64 expire_time = datetime.now() + parse_timedelta(amount) 65 backlog_entry = ( 66 session.query(BacklogEntry) 67 .filter(BacklogEntry.title == entry['title']) 68 .filter(BacklogEntry.task == task.name) 69 .first() 70 ) 71 if backlog_entry: 72 # If there is already a backlog entry for this, update the expiry time if necessary. 73 if backlog_entry.expire < expire_time: 74 logger.debug('Updating expiry time for {}', entry['title']) 75 backlog_entry.expire = expire_time 76 else: 77 logger.debug('Saving {}', entry['title']) 78 backlog_entry = BacklogEntry() 79 backlog_entry.title = entry['title'] 80 backlog_entry.entry = snapshot 81 backlog_entry.task = task.name 82 backlog_entry.expire = expire_time 83 session.add(backlog_entry) 84 85 def learn_backlog(self, task, amount=''): 86 """Learn current entries into backlog. All task inputs must have been executed.""" 87 with Session() as session: 88 for entry in task.entries: 89 self.add_backlog(task, entry, amount, session=session) 90 91 @with_session 92 def get_injections(self, task, session=None): 93 """Insert missing entries from backlog.""" 94 entries = [] 95 for backlog_entry in get_entries(task=task.name, session=session): 96 entry = backlog_entry.entry 97 98 # this is already in the task 99 if task.find_entry(title=entry['title'], url=entry['url']): 100 continue 101 logger.debug('Restoring {}', entry['title']) 102 entries.append(entry) 103 if entries: 104 logger.verbose('Added {} entries from backlog', len(entries)) 105 106 # purge expired 107 purged = clear_entries(task=task.name, all=False, session=session) 108 logger.debug('{} entries purged from backlog', purged) 109 110 return entries 111 112 113@event('plugin.register') 114def register_plugin(): 115 plugin.register(InputBacklog, 'backlog', builtin=True, api_ver=2) 116